Understanding Structured Concurrency in Java
Java's structured concurrency simplifies concurrent programming by automating cancellation, preventing thread leaks, and improving task management.
Join the DZone community and get the full member experience.
Join For FreeTypically, complexity in programming is managed by breaking down tasks into subtasks. These subtasks can then be executed concurrently.
Since Java 5, ExecutorService
API helps the programmer execute these subtasks concurrently. However, given the nature of concurrent execution, each subtask could fail or succeed independently with no implicit communication between them. The failure of one subtask does not automatically cancel the other subtasks. Although an attempt can be made to manage this cancellation manually via external handling, it's quite tricky to get it right — especially when a large number of subtasks are involved.
This could potentially result in loose threads (alternatively known as thread leaks). Although virtual threads are a cost-effective way to dedicate a thread to each (sub)task, managing their results remains a challenge.
Executor service allows one thread to create it and another thread to submit tasks to this executor service. The threads that perform the tasks bear no relation to both of these threads. Moreover, a completely different thread can await the result of execution from these subtasks via reference to its Future
object — which is provided immediately upon task submission to the executor service. Thus, effectively, the subtask does not have to return to the task that submitted it. It could possibly return to multiple threads or none.
Also, the relationship between a task and its subtask is only logical and not visible in the code structure. There is no enforcement or tracking of the task-subtask relationship in runtime.
Structured concurrency in Java aims to solve all of the above challenges by:
- Reliably automating the cancellation of subtasks; avoiding thread leaks and delays.
- Ensuring a (sub)task return only to the thread that submitted it.
- Enforcing a structured relation between a task and its subtasks — which could be nested as well.
Unstructured Concurrency
Let's understand more about the current situation of unstructured concurrency in Java. Consider a function handle()
which fetches user information and its associated order.
Response handle() throws ExecutionException, InterruptedException {
Future<String> user = esvc.submit(() -> findUser());
Future<Integer> order = esvc.submit(() -> fetchOrder());
String theUser = user.get(); // Join findUser
int theOrder = order.get(); // Join fetchOrder
return new Response(theUser, theOrder);
}
At first glance, the code seems simple and does what it intends to do. However, on closer look, we could identify multiple issues:
If findUser()
throws exception then the thread running fetchOrder()
leaks as the latter has no information or knowledge about the former’s execution status.
- If thread running
handle()
is interrupted, then both the threads runningfindUser()
andfetchOrder()
leaks as these threads, too, have no knowledge of the thread that spawned them. - If
findUser()
took too long, and meanwhilefetchOrder()
fails, the failure would only be identified whenorder.get()
is invoked. - Although the code is structured as task related to its subtask, this relation is merely logical and is neither explicitly described at compile time nor enforced during runtime.
The first three situations arise due to the lack of an automated mechanism for cancelling the other threads. This potentially leads to resource wastage (as threads continue to execute), cancellation delays, and, at worst, these leaked threads may interfere with other threads, too. Although we may attempt to handle the cancellation manually, not only is it tricky to get it right, but it complicates the overall program and creates more room for errors.
The fourth situation arises due to a lack of formal syntax, which binds the threads into a parent-child relationship for their task to subtask hierarchy. The Future
object provided are unhelpful too in this case.
All these limitations are due to the unstructured nature of concurrency via ExecutorService
and Future
which lacks an automated way of canceling or tracking tasks and subtasking relationships.
Structured Concurrency
Structured concurrency principles that:
If a task splits into concurrent subtasks then they all return to the same place, namely the task’s code block.
In structured concurrency, the task awaits the subtasks’ results and monitors them for failures. APIs also define well-defined entry and exit points for the flow of execution through a block of code. APIs also help enforce a strict nesting of the lifetimes of operations in a way that mirrors their syntactic nesting in the code.
Structured concurrency has a natural synergy with virtual threads. A new virtual thread can be dedicated to every task, and when a task fans out by submitting subtasks for concurrent execution, it can dedicate a new virtual thread to each subtask. Moreover, the task-subtask relationship has a tree structure for each virtual thread to carry a reference to its unique parent.
While virtual threads deliver an abundance of threads, structured concurrency can correctly and robustly coordinate them. This also enables observability tools to display threads as they are understood by the developer.
StructuredTaskScope
In a structured concurrency API, StructuredTaskScope
is the principal class.
The earlier example of handle()
function rewritten with StructuredTaskScope
is shown below.
Response handle() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<String> user = scope.fork(() -> findUser());
Supplier<Integer> order = scope.fork(() -> fetchOrder());
scope.join() // Join both sub-tasks
.throwIfFailed(); // ... and propagate errors
// Here, both sub-tasks have succeeded, so compose their results
return new Response(user.get(), order.get());
}
}
With use of these APIs we achieve all of the below, which addresses the shortcomings of the unstructured concurrency discussed so far.
- On failure of either
findUser()
orfetchOrder()
other is automatically cancelled; in case not completed yet. - In case the thread running
handle()
is interrupted before or during invocation tojoin()
both the subtask viz.findUser()
orfetchOrder()
are cancelled; in case not completed yet. - The task structure and code mirror each other with abundant clarity. The
scope
is considered a task (parent) while thefork
are subtasks (children). The task waits for subtasks to complete or be cancelled and decide to succeed or fail with no overhead of lifecycle management. - An additional benefit earned via the hierarchy of task to subtask is major improvement in observability. The call stack or thread dump clearly displays the relationship between the
handle()
tofindUser()
andfetchOrder()
which can easily be understood by the developer.
With the automatic cancellation/cleanup achieved via ShutdownOnFailure
policy, thread leaks are avoided altogether. This policy can be customized or replaced with other available policies.
Below are a few of the important characteristics of StructuredTaskScope
a developer should be aware of
- As of JDK 24, this is still a preview feature and is thus disabled by default.
- The thread that creates the scope is its owner.
- With every invocation of
fork(…)
new virtual thread is started, for execution of subtask, by default. - A subtask can create its own nested
StructuredTaskScope
to fork its own subtasks, thus creating a hierarchy. Once the scope is closed, all of its subtasks are guaranteed to be terminated, ensuring that no threads are leaked. - The scope owner or any of its (nested) subtasks can invoke
shutdown()
on scope. Thus initiating cancellation of other unfinished subtasks. Moreover, forking of new subtasks is prevented too. - If
shutdown()
orfork()
is invoked by thread which either is non-owner or not part of scope hierarchy,WrongThreadException
is thrown. - Calling either
join()
orjoinUntil(Instant)
within a scope is mandatory. If a scope's block exits before joining, then the scope will wait for all subtasks to terminate and then throw an exception. - When
join()
completes successfully, then each of the subtasks has either completed successfully, failed, or been cancelled because the scope was shut down. StructuredTaskScope
enforces structure and order upon concurrent operations. Thus, using a scope outside of atry-with-resources
block and returning without callingclose()
, or without maintaining the proper nesting ofclose()
calls, may cause the scope's methods to throw aStructureViolationException
.
Shutdown Policies
To avoid any unnecessary processing during concurrent subtask execution, it is common to use short-circuiting patterns. In a structured concurrency shutdown, policies can be utilized for such requirements.
Two subclasses of StructuredTaskScope
, ShutdownOnFailure
and ShutdownOnSuccess
, support these patterns with policies that shut down the scope when the first subtask fails or succeeds, respectively.
The earlier example already shows the usage of ShutdownOnFailure
while below example exhibits usage of ShutdownOnSuccess
which returns the result of the first successful subtask. A real-world example would be to query from multiple servers for a response and return results of the very first response from any of the servers (tail chopping).
<T> T race(List<Callable<T>> tasks, Instant deadline)
throws InterruptedException, ExecutionException, TimeoutException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
for (var task : tasks) {
scope.fork(task);
}
return scope.joinUntil(deadline)
.result(); // Throws if none of the sub-tasks completed successfully
}
}
Note that as soon as one subtask succeeds, this scope automatically shuts down, canceling unfinished subtasks. The task fails if all of the subtasks fail or if the given deadline elapses.
While these two shutdown policies are provided out of the box, they can be customized as per requirements.
Processing Results
Shutdown policies additionally provide centralized methods for handling exceptions and, possibly, successful results. This is in line with the spirit of structured concurrency, according to which an entire scope is treated as a unit.
The scope’s owner can process the results of the subtasks using the Subtask
objects returned from the calls to fork(...)
if they are not processed by the policy.
If the shutdown policy itself processes subtask results — as in the case of ShutdownOnSuccess
— then the Subtask
objects returned by fork(...)
should be avoided altogether, and the fork(...)
method treated as if it returned void
. Subtasks should return, as their result, any information that the scope owner should process after centralized exception handling by the policy.
Structured Concurrency in Various Programming Languages
While structured concurrency is still being previewed in Java, it is already available in various programming languages. Here is sneak peak for few of these languages.
Kotlin
- Coroutines. Kotlin offers coroutines, which are a lightweight concurrency model that allows for asynchronous programming without blocking threads. Coroutines provide structured concurrency through scopes, ensuring that tasks are properly managed and canceled when necessary.
- Structured concurrency. Kotlin’s structured concurrency is built into its coroutine framework, making it easy to write concurrent code that is both efficient and easy to understand.
Go
- Goroutines. Go uses goroutines, which are lightweight threads managed by the Go runtime. Goroutines can be easily created and managed, allowing for high concurrency.
- Channels. Go provides channels for communication between goroutines, enabling structured concurrency by ensuring that tasks can communicate and synchronize effectively.
Python
- Asyncio. Python’s asyncio library provides support for asynchronous programming using coroutines. Asyncio allows for structured concurrency through the use of tasks and event loops, ensuring that tasks are properly managed and canceled when necessary.
- Task groups. Python’s asyncio library includes task groups, which provide a way to manage and cancel groups of tasks together, ensuring that tasks are properly coordinated.
C#
- Async/Await. C# provides support for asynchronous programming using the async and await keywords. This allows for structured concurrency by ensuring that tasks are properly managed and cancelled when necessary.
- Task Parallel Library (TPL). The TPL provides support for parallel programming in C#, allowing for the creation and management of tasks in a structured manner.
Conclusion
In conclusion, structured concurrency in Java represents a significant evolution in concurrent programming, addressing many of the shortcomings found in traditional unstructured concurrency models.
Key advantages of structured concurrency in Java include:
- Automated cancellation. Subtasks are automatically cancelled upon failure, reducing resource wastage and eliminating the complexity of manual cancellation handling.
- Clear task hierarchy. The hierarchical task structure enhances code readability, maintainability, and observability, making it easier to debug and monitor concurrent operations.
- Improved error handling. Centralized error handling through structured concurrency ensures predictable and robust behavior in the presence of exceptions.
- Enhanced observability. The clear parent-child relationships displayed in thread dumps and call stacks aid developers in understanding and managing concurrent tasks.
- Virtual threads. The synergy with virtual threads allows for efficient and scalable concurrent programming, making it possible to handle a large number of concurrent tasks without the overhead of traditional threads.
By adopting structured concurrency, Java developers can write more efficient, reliable, and maintainable concurrent code, ultimately leading to better software quality and improved developer productivity.
References and Further Reads
Published at DZone with permission of Ammar Husain. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments