Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

How to Synchronize Blocks by the Value of the Object in Java

DZone 's Guide to

How to Synchronize Blocks by the Value of the Object in Java

Ever had trouble synchronizing blocks of code?

· Java Zone ·
Free Resource

The Problem

Sometimes, we need to synchronize blocks of code by the value of a variable.

In order to understand this problem, we will consider a simple banking application that makes the following operations on each transfer of money by the client:

  1. Evaluates the amount of the cash back by this transfer from external web service (CashBackService)
  2. Performs a money transfer in the database (AccountService)
  3. Updates the data in the cash-back evaluation system (CashBackService)

The money transfer operation looks like this:

public void withdrawMoney(UUID userId, int amountOfMoney) {
    synchronized (userId) {  
        Result result = externalCashBackService.evaluateCashBack(userId, amountOfMoney); 
        accountService.transfer(userId, amountOfMoney + result.getCashBackAmount()); 
        externalCashBackService.cashBackComplete(userId, result.getCashBackAmount()); 
    }
}


The base components of the application are shown in the following diagram:

component diagram of the application

I tried to make an example as clear as possible. The transfer of money in the payment service depends on the other two services:

  • The first one is a CashBackService that interacts with another (external) web application under the REST protocol. And, in order to calculate the actual cash-back, we need to synchronize transactions with this application. This is because the next amount of the cash-back may depend on the total amount of user payments.
  • The second is an AccountService that communicates with an internal database and stores data related to accounts of its users. In this service, we can use a JPA transaction to make some actions as atomic operations in the database.

In a real life, I’d strongly recommend refactoring such systems to avoid this situation, if possible. But in our example, imagine that we have no choice.

Let’s look at the draft code of this application:

@Service
public class PaymentService {

    @Autowired
    private ExternalCashBackService externalCashBackService;

    @Autowired
    private AccountService accountService;

    public void withdrawMoney(UUID userId, int amountOfMoney) {
        synchronized (userId) {  
            Result result = externalCashBackService.evaluateCashBack(userId, amountOfMoney); 
            accountService.transfer(userId, amountOfMoney + result.getCashBackAmount()); 
            externalCashBackService.cashBackComplete(userId, result.getCashBackAmount()); 
        }
    }
}


@Service
public class ExternalCashBackService {

    @Autowired
    private RestTemplate restTemplate;

    public Result evaluateCashBack(UUID userId, int amountOfMoney) {
        return sendRestRequest("evaluate", userId, amountOfMoney);
    }

    public Result cashBackComplete(UUID userId, int cashBackAmount) {
        return sendRestRequest("complete", userId, cashBackAmount);
    }

    private Result sendRestRequest(String action, UUID userId, int value) {

        URI externalCashBackSystemUrl =
                URI.create("http://cash-back-system.org/api/" + action); 

        HttpHeaders headers = new HttpHeaders();
        headers.set("Accept", MediaType.APPLICATION_JSON_VALUE);
        RequestDto requestDto = new RequestDto(userId, value);
        HttpEntity<?> request = new HttpEntity<>(requestDto, headers);

        ResponseDto responseDto = restTemplate.exchange(externalCashBackSystemUrl,
                                                        HttpMethod.GET,
                                                        request,
                                                        ResponseDto.class)
                                              .getBody();  

        return new Result(responseDto.getStatus(), responseDto.getValue());
    }
}

@Service
public class AccountService {

    @Autowired
    private AccountRepository accountRepository;

    @Transactional(isolation = REPEATABLE_READ)  
    public void transfer(UUID userId, int amountOfMoney) {
        Account account = accountRepository.getOne(userId);
        account.setBalance(account.getBalance() - amountOfMoney);
        accountRepository.save(account);
    }
}


However, you can have several objects with the same value (userId in this example), but the synchronization works on the instance of the object and not on its value.

The code below does not work well. Because it’s incorrectly synchronized; the static factory method UUID.fromString(..) makes a new instance of the UUID class on each call, even if you pass an equal string argument.

So, we get different instances of the UUID for equal keys. If we run this code from multiple threads, then we have a good chance to get a problem with synchronization:

public void threadA() {
  paymentService.withdrawMoney(UUID.fromString("ea051187-bb4b-4b07-9150-700000000000"), 1000);
}

public void threadB() {
  paymentService.withdrawMoney(UUID.fromString("ea051187-bb4b-4b07-9150-700000000000"), 5000);
}


In this case, you need to obtain the same reference for equals objects to synchronize on it.

Wrong Ways to Solve This Issue

Synchronized Methods

You can move the synchronized on a method:

public synchronized void withdrawMoney(UUID userId, int amountOfMoney) {
    ..
}


This solution has a bad performance. You will block transfers of money for absolutely all users. And if you need to synchronize different operations in the different classes by the same key, this solution does not help you at all.

String Intern

In order to ensure that the instance of the class, which contains a user ID, will be the same in all synchronized blocks, we can serialize it into a String and use the String.intern() to obtain the same link for equals strings.

String.intern uses a global pool to store strings that are interned. And when you request intern on the string, you get a reference from this pool if such string exists there or else this string puts in the pool.

You can find more details about String.intern in The Java Language Specification - 3.10.5 String Literals or in the Oracle Java documentation about the String.intern

public void withdrawMoney(UUID userId, int amountOfMoney) {
  synchronized (userId.toString().intern()) {
      ..
  }
}


Using the intern is not a good practice because the pool of Strings is difficult to clean with the GC. And, your application can consume too many resources with the active use of the String.intern.

Also, there is a chance that a foreign code is synchronized on the same instance of the string as your application. This can lead to deadlocks.

In general, the use of intern is better left to the internal libraries of the JDK; there are good articles by Aleksey Shipilev about this concept.

How Can We Solve This Problem Correctly?

Create Your Own Synchronization Primitive

We need to implement a behavior that describes the next diagram:

diag 0672834a7737bb323990aabe3bcb5ce6

At first, we need to make a new synchronization primitive — the custom mutex. That will work by the value of the variable, and not by the reference to the object.

It will be something like a "named mutex," but a little wider, with the ability to use the value of any objects for identification, not just the value of a String. You can find examples of synchronization primitives to locking by the name in other languages (C++, C#). Now, we will solve this issue in Java.

The solution will look something like this:

public void withdrawMoney(UUID userId, int amountOfMoney) {
  synchronized (XMutex.of(userId)) {
      ..
  }
}


In order to ensure that the same mutexes are obtained for equal values of variables, we will make the mutex factory.

public void withdrawMoney(UUID userId, int amountOfMoney) {
  synchronized (XMutexFactory.get(userId)) {
      ..
  }
}

public void purchase(UUID userId, int amountOfMoney, VendorDescription vendor) {
  synchronized (XMutexFactory.get(userId)) {
      ..
  }
}


In order to return the same instance of mutex on the each of requests with equal keys, we will need to store the created mutexes. If we will store these mutexes in the simple HashMap, then the size of the map will increase as new keys appear. And we don’t have a tool to evaluate a time when a mutex not used anywhere.

In this case, we can use the WeakReference to save a reference to the mutex in the map, just when it uses. In order to implement this behavior, we can use the WeakHashMap data structure. I wrote an article about this type of references a couple of months ago; you can consider it in more details here: Soft, Weak, Phantom References in Java

Our mutex factory will be based on the WeakHashMap. The mutex factory creates a new mutex if the mutex for this value(key) is not found in the HashMap. Then, the created mutex is added to the HashMap. Using of the WeakHashMap allows us to store a mutex in the HashMap while existing any references to it. And, the mutex will be removed from a HashMap automatically when all references to it are released.

We need to use a synchronized version of WeakHashMap; let’s see what’s described in the documentation about it:

This class is not synchronized. A synchronized WeakHashMap may be constructed
using the Collections.synchronizedMap method.


It’s very sad, and a little later, we’ll take a closer look at the reason. But for now, let’s consider an example of implementation, which is proposed by the official documentation (I mean the use of  Collections.synchronizedMap):

public final Map<XMutex<KeyT>, WeakReference<XMutex<KeyT>>> weakHashMap =
    Collections.synchronizedMap(new WeakHashMap<XMutex<KeyT>, 
                                                WeakReference<XMutex<KeyT>>>());

public XMutex<KeyT> getMutex(KeyT key) {
    validateKey(key);
    return getExist(key)
            .orElseGet(() -> saveNewReference(key));
}

private Optional<XMutex<KeyT>> getExist(KeyT key) {
    return Optional.ofNullable(weakHashMap.get(XMutex.of(key)))
                   .map(WeakReference::get);
}

private XMutex<KeyT> saveNewReference(KeyT key) {

    XMutex<KeyT> mutex = XMutex.of(key);

    WeakReference<XMutex<KeyT>> res = weakHashMap.put(mutex, new WeakReference<>(mutex));
    if (res != null && res.get() != null) {
        return res.get();
    }
    return mutex;
}


What About Performance?

If we look at the code of the Collections.synchronizedMap, then we find a lot of synchronizations on the global mutex, which is created in pair with a SynchronizedMap instance.

SynchronizedMap(Map<K,V> m) {
    this.m = Objects.requireNonNull(m);
    mutex = this;   
}


And all other methods of the SynchronizedMap are synchronized on mutex:

public int size() {
    synchronized (mutex) {return m.size();}
}
public boolean containsKey(Object key) {
    synchronized (mutex) {return m.containsKey(key);}
}
public V get(Object key) {
    synchronized (mutex) {return m.get(key);}
}
public V put(K key, V value) {
    synchronized (mutex) {return m.put(key, value);}
}
public V remove(Object key) {
    synchronized (mutex) {return m.remove(key);}
}

...


This solution does not have the best performance. All of these synchronizations lead us to permanent locks on each operation with a factory of mutexes.

ConcurrentHashMap With a WeakReference as a Key

We need to look at the using of the ConcurrentHashMap. It has a better performance than Collections.synchronizedMap. But we have one problem — the ConcurrentHashMap doesn’t allow the use of weak references. This means that the garbage collector cannot delete unused mutexes.

I found two ways to solve this problem:

  • The first is to create my own ConcurrentMap implementation. This is the right decision, but it will take a very long time.
  • The second one is the use of the ConcurrentReferenceHashMap implementation from the Spring Framework. This is a good implementation, but it has a couple of nuances. We will consider them below.

Let’s change the XMutexFactory implementation to use a ConcurrentReferenceHashMap:

public class XMutexFactory<KeyT> {

  /**
   * Create mutex factory with default settings
   */
  public XMutexFactory() {
      this.map = new ConcurrentReferenceHashMap<>(DEFAULT_INITIAL_CAPACITY,
                                                  DEFAULT_LOAD_FACTOR,
                                                  DEFAULT_CONCURRENCY_LEVEL,
                                                  DEFAULT_REFERENCE_TYPE);
  }

  /**
   * Creates and returns a mutex by the key.
   * If the mutex for this key already exists in the weak-map,
   * then returns the same reference of the mutex.
   */
  public XMutex<KeyT> getMutex(KeyT key) {
      return this.map.compute(key, (k, v) -> (v == null) ? new XMutex<>(k) : v);
  }

}


That’s cool!

Less code, but more performance than before. Let’s try to check the performance of this solution.

Create a Simple Benchmark

I made a small benchmark in order to select an implementation.

There are three implementations of the Map involved in the test:

  •  Collections.synchronizedMap based on the WeakHashMap 
  •  ConcurrentHashMap 
  •  ConcurrentReferenceHashMap 

I use the ConcurrentHashMap in benchmark just for comparing in measurements. This implementation is not suitable for use in the factory of mutexes because it does not support the use of weak or soft references.

All benchmarks are written with using the JMH library.

# Run complete. Total time: 00:04:39

Benchmark                                   Mode     Cnt        Score         Error   Units
ConcurrentMap.ConcurrentHashMap            thrpt       5        0,015 ?       0,004  ops/ns
ConcurrentMap.ConcurrentReferenceHashMap   thrpt       5        0,008 ?       0,001  ops/ns
ConcurrentMap.SynchronizedMap              thrpt       5        0,005 ?       0,001  ops/ns
ConcurrentMap.ConcurrentHashMap             avgt       5      565,515 ?      23,638   ns/op
ConcurrentMap.ConcurrentReferenceHashMap    avgt       5     1098,939 ?      28,828   ns/op
ConcurrentMap.SynchronizedMap               avgt       5     1503,593 ?     150,552   ns/op
ConcurrentMap.ConcurrentHashMap           sample  301796      663,330 ?      11,708   ns/op
ConcurrentMap.ConcurrentReferenceHashMap  sample  180062     1110,882 ?       6,928   ns/op
ConcurrentMap.SynchronizedMap             sample  136290     1465,543 ?       5,150   ns/op
ConcurrentMap.ConcurrentHashMap               ss       5   336419,150 ?  617549,053   ns/op
ConcurrentMap.ConcurrentReferenceHashMap      ss       5   922844,750 ?  468380,489   ns/op
ConcurrentMap.SynchronizedMap                 ss       5  1199159,700 ? 4339391,394   ns/op


In this micro-benchmark, I create a situation when several threads compute values in the map. You can consider the source code of this benchmark in more details here at Concurrent Map benchmark

Put it on the graph:

benchmark result

So, the ConcurrentReferenceHashMap justifies its use in this case.

Getting Started With the XSync Library

I packed this code into the XSync library, and you can use it as a ready solution for the synchronization on the value of variables.

In order to do it, you need to add the next dependency:

<dependency>
  <groupId>com.antkorwin</groupId>
  <artifactId>xsync</artifactId>
  <version>1.1</version>
</dependency>


Then, you are able to create instances of the XSync class for synchronization on types that you need. For the Spring Framework, you can make them as beans:

@Bean
public XSync<UUID> xSync(){
    return new XSync<>();
}


And now, you can use it:

@Autowired
private XSync<UUID> xSync;

public void withdrawMoney(UUID userId, int amountOfMoney) {
  xSync.execute(userId, () -> {
      Result result = externalPolicySystem.validateTransfer(userId, amountOfMoney, WITHDRAW);
      accountService.transfer(userId, amountOfMoney, WITHDRAW);
  });
}

public void purchase(UUID userId, int amountOfMoney, VendorDescription vendor) {
  xSync.execute(userId, () -> {
      ..
  });
}


Concurrent Tests

In order to be sure that this code works well, I wrote several concurrent tests.

There is an example of one of these tests:

public void testSyncBySingleKeyInConcurrency() {
   // Arrange
   XSync<UUID> xsync = new XSync<>();
   String id = UUID.randomUUID().toString();
   NonAtomicInt var = new NonAtomicInt(0);

   // There is a magic here: 
   // we created a parallel stream and try to increment 
   // the same nonatomic integer variable in each stream
   IntStream.range(0, THREAD_CNT)
            .boxed()
            .parallel()
            .forEach(j -> xsync.execute(UUID.fromString(id), var::increment));  

   // Asserts
   await().atMost(5, TimeUnit.SECONDS)
          .until(var::getValue, equalTo(THREAD_CNT));

   Assertions.assertThat(var.getValue()).isEqualTo(THREAD_CNT);
}

/**
 * Implementation of the does not thread safe integer variable:
 */
@Getter
@AllArgsConstructor
private class NonAtomicInt {  
    private int value;

    public int increment() {
        return value++;
    }
}


Let’s see the result of this test:

concurrent test result


References

XSync library on GitHub: https://github.com/antkorwin/xsync

Examples using the XSync library: https://github.com/antkorwin/xsync-example

The original article can be found here.

Topics:
java ,concurrency ,synchronized ,concurrenthashmap ,weakreference ,mutex

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}