Multi-Threading and Spring Transactions
Developers apply @Transactional annotation provided by the Spring framework and rely on the implemented mechanisms for transaction management. Is this enough?
Join the DZone community and get the full member experience.
Join For FreeAs developers, we are used to applying the @Transactional
annotation provided by the Spring framework and rely on the mechanisms implemented by the framework for transaction management. But is this enough?
Well, the answer is clear: No.
Spring takes care of all underlying transaction management details and provides a consistent programming model for different transaction APIs, but how many people really understand how it behaves in a multi-threading environment? Is it possible to open a transaction and write data in multiple threads? According to some forum statements, the answer is: yes, it is! However, according to the framework, it is not.
Let's take a step back and think about the EntityManager
. The EntityManager
works with a session or a cache of objects being managed by it. This means it has a state, and a state sharing between several threads can lead to race conditions; so, rule number one is to use one EntityManager
per thread.
As a matter of fact, Spring takes care of keeping the transactional context per thread. Suppose we want to process in parallel a list of objects and store them in the database. We want to group those objects in dedicated chunks and pass each chunk to a processing method in a separate thread. Then, the results processed in each thread should be collected and presented to the user.
I will start with the definition of a service interface responsible for the processing but also having no clue about the parallel processing we would like to implement:
/**
* Service interface defining the contract for object identifiers processing
*/
public interface ProcessingService {
/**
* Processes the list of objects identified by id and returns a an identifiers
* list of the successfully processed objects
*
* @param objectIds List of object identifiers
*
* @return identifiers list of the successfully processed objects
*/
List<Integer> processObjects(List objectIds);
}
The default implementation of this service is based on database storage. However, the example is very simplistic:
/**
* Service implementation for database related ids processing
*/
@Service("ProcessingDBService")
public class ProcessingDBService implements ProcessingService {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Transactional
@Override
public List processObjects(List objectIds) {
// Process and save to DB
logger.info("Running in thread " + Thread.currentThread().getName() + " with object ids " + objectIds.toString());
return objectIds.stream().collect(Collectors.toList());
}
}
Now, we would like to have the option to run this processing in chunks and parallel processes. In order to keep the code clean and decoupled from its runtime context, we will use the Decorator
pattern as follows:
/**
* Service implementation for parallel chunk processing
*/
@Service
@Primary
@ConditionalOnProperty(prefix = "service", name = "parallel", havingValue = "true")
public class ProcessingServiceParallelRunDecorator implements ProcessingService {
private ProcessingService delegate;
public ProcessingServiceParallelRunDecorator(ProcessingService delegate) {
this.delegate = delegate;
}
/**
* In a real scenario it should be an external configuration
*/
private int batchSize = 10;
@Override
public List<Integer> processObjects(List objectIds) {
List<List<Integer>> chuncks = getBatches(objectIds, batchSize);
List<List<Integer>> processedObjectIds = chuncks.parallelStream().map(delegate::processObjects)
.collect(Collectors.toList());
return flatList(processedObjectIds);
}
private List<List<Integer>> getBatches(List collection, int batchSize) {
return IntStream.iterate(0, i -> i < collection.size(), i -> i + batchSize)
.mapToObj(i -> collection.subList(i, Math.min(i + batchSize, collection.size())))
.collect(Collectors.toList());
}
private List<Integer> flatList(List> listOfLists) {
return listOfLists.stream().collect(ArrayList::new, List::addAll, List::addAll);
}
The actual calls are delegated to a processing service implementation, but the Decorator
takes care of work distribution across threads and collecting the result. The implementation of the Decorator
pattern enables the client code to stay unaware of the actual implementation. It is possible to directly inject the single-threaded version, but also the multi-threaded version without any direct change of the client code.
In order to understand what a client code might look like, let's take a look at a simple Unit
test:
@RunWith( SpringJUnit4ClassRunner.class )
@SpringBootTest(properties = { "service.parallel=false" })
public class ProcessingServiceTest {
@Autowired
ProcessingService processingService;
ProcessingService processingServiceDecorator;
@Test
public void shouldRunParallelProcessingUsingDecorator() {
processingServiceDecorator = new ProcessingServiceParallelRunDecorator(processingService);
List objectIds = Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12});
List resultList = processingServiceDecorator.processObjects(objectIds);
Assert.assertEquals(objectIds, resultList);
}
}
The code passes a list of objectIds
and runs the Decorator
service explicitly created in the test. It is expected that due to the internally configured chunk size to 10, two threads will be processing the data. By checking the logs we can see the following rows:
ProcessingDBService: Running in thread ForkJoinPool.commonPool-worker-3 with object ids [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
ProcessingDBService: Running in thread main with object ids [11, 12]
In exactly two threads, the parallel stream uses the main thread for processing and a second one to distribute the work across them.
One important aspect of this processing is transaction handling. The first 10 elements were processed in one transaction while the last 2 were processed in another one. If you take a look at the ProcessingDBService
, you will see the public method is annotated with @Transactional
annotation. This is how Spring is expected to work: it takes care of holding the transactional context per thread in dedicated ThreadLocal
objects and does not support running multiple threads in one transaction.
Error handling was not covered in this article but will be targeted in a subsequent one. One more note is that the dependency injection in the Decorator
class is based on constructor injection. In order to be managed by a Spring container, you might need to use the @Qualifier
in the constructor.
The code is part of a small project posted on GitHub.
Opinions expressed by DZone contributors are their own.
Comments