Reading and Writing With a ConcurrentHashMap
Get up to speed with concurrent writes with ConcurrentHashMap.
Join the DZone community and get the full member experience.
Join For FreeConcurrentHashMap
provides a Map
implementation with thread-safe read and write operations.
The Map
and ConcurrentMap
interfaces provide methods that ConcurrentHashMap takes advantage of to provide thread-safe interactions. Generally, I tend to solely really on the Map
interface as it provides most of the same methods that ConcurrentMap
has; however, depending on your use case, it might be beneficial to check out the ConcurrentMap
methods yourself.
This post will look at reading from and writing to a ConcurrentHashMap
.
Reading
ConcurrentHashMap
allows concurrent read access to its contents where a key can still be read from even if the same key is being modified by another thread.
Reading itself does not affect the map’s contents and is therefore safe and does not break any of the guarantees that ConcurrentHashMap
provides. However, this does mean that read values could be “old”, as the map doesn’t block reads even when updates are actively occurring on other threads. That being said, as soon as the update completes, reading threads will receive the updated value.
From my searching, there doesn’t seem to be a native Map
implementation that prevents reads and updates from occurring simultaneously on the same key. There is Collections.synchronizedMap
, but that synchronises access to the whole map rather than per key.
Below is an example showing how reads are not blocked while the map is being written to using compute
(which we’ll look at in the writing section):
public static void main(String[] args) {
// Put some values into the map.
Map<Integer, String> map = new ConcurrentHashMap<>(Map.of(
1, "INITIAL",
2, "INITIAL",
3, "INITIAL"
));
// Slowly write to key [1] in the map using `compute`.
new Thread(() -> {
while (true) {
map.compute(1, (key, value) -> {
for (int i = 0; i < 5; i++) {
System.out.println("[1] computing...");
sleep(1000);
}
return "COMPUTE";
});
System.out.println("[1] updated from compute");
}
}).start();
// Read each key in the map in a loop.
for (int key : map.keySet()) {
new Thread(() -> {
while (true) {
String value = map.get(key);
System.out.println("[" + key + "] read from thread - " + value);
sleep(1000);
}
}).start();
}
}
// Just added to contain the try/catch when sleeping.
private static void sleep(int time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
When run, the code outputs something like:
[1] computing...
[2] read from thread - INITIAL
[3] read from thread - INITIAL
[1] read from thread - INITIAL
[1] computing...
[2] read from thread - INITIAL
[3] read from thread - INITIAL
[1] read from thread - INITIAL
[1] computing...
[1] read from thread - INITIAL
[3] read from thread - INITIAL
[2] read from thread - INITIAL
[1] computing...
[2] read from thread - INITIAL
[3] read from thread - INITIAL
[1] read from thread - INITIAL
[1] computing...
[2] read from thread - INITIAL
[3] read from thread - INITIAL
[1] read from thread - INITIAL
[1] updated from compute
[1] computing...
[3] read from thread - INITIAL
[2] read from thread - INITIAL
[1] read from thread - COMPUTE
This shows that reads occur on every key (1, 2, 3) even though key 1 is being written throughout the example’s execution.
Writing
ConcurrentHashMap
prevents multiple threads from writing to the same key simultaneously. From a superficial perspective, when a write operation begins, such as compute
, it takes a lock for the specified key and blocks any other threads trying to write to the same key. Once the write finishes, the lock is released, and the process begins again with one of the blocked threads taking the lock.
The more complicated explanation is that ConcurrentHashMap
keeps key-value pairs in buckets/bins/segments (I’ll refer to them as buckets from now on), which hold many pairs and can be locked. Writing to a key locks its bucket, preventing writes to any key within the bucket. This means that seemingly unrelated writes to separate keys can still block each other; however, this is unlikely to noticeably impact your application. Writes to unlocked buckets are not blocked, with these buckets then transitioning to a locked state while the write executes.
All the write methods available to the Map
and ConcurrentMap
interfaces work like this when using a ConcurrentHashMap
.
The behaviour described above ensures that each write behaves deterministically. Otherwise, a call to compute
could read the current value while another thread comes in, writes a new value, and then the compute
call finishes and updates the value again. This is wrong because the compute
’s code did not accommodate this change and is operating on out-of-date data. The same can be said about the key-value mapping being removed while compute
is executing.
I’ve mentioned compute
many times throughout this post. It is the primary method I use when writing to a ConcurrentHashMap
where the current value should be read when determining the new one. Methods such as, computeIfAbsent
, computeIfPresent
and merge
have similar behaviour to compute
.
The example below shows that multiple threads cannot write to the same key at the same time:
public static void main(String[] args) {
// Put some values into the map.
Map<Integer, String> map = new ConcurrentHashMap<>(Map.of(
1, "INITIAL",
2, "INITIAL",
3, "INITIAL"
));
// Slowly write to key [1] in the map using `compute`.
new Thread(() -> {
while (true) {
map.compute(1, (key, value) -> {
for (int i = 0; i < 5; i++) {
System.out.println("[1] computing...");
sleep(1000);
}
return "COMPUTE";
});
System.out.println("[1] updated from compute");
}
}).start();
// Write to each key in the map in a loop.
for (int key : map.keySet()) {
new Thread(() -> {
while (true) {
map.put(key, "THREAD");
System.out.println("[" + key + "] updated from thread");
sleep(1000);
}
}).start();
}
}
// Just added to contain the try/catch when sleeping.
private static void sleep(int time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
When run, the code outputs something like:
[1] computing...
[2] updated from thread
[3] updated from thread
[1] computing...
[2] updated from thread
[3] updated from thread
[1] computing...
[2] updated from thread
[3] updated from thread
[1] computing...
[3] updated from thread
[2] updated from thread
[1] computing...
[3] updated from thread
[2] updated from thread
[1] updated from compute
[1] updated from thread
Here compute
and put
are called for key 1 on different threads. The compute
began executing first and claimed the lock, preventing the put
from executing until the compute
call was completed. You can see this in the output as there is no update to key 1 until compute
finishes, and then the write from put
instantly executes afterwards. While these threads contest each other, the writes to keys 2 and 3 continue without issue.
A Closer Look at Compute
I wanted to quickly go over compute
specifically as I often use it myself, but I still have to check how to use it correctly each time…
Although the JavaDocs for compute
are actually quite thorough, seeing an example tends to make it sink in better (for me at least):
// Setup for the example.
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
Map<Integer, ScheduledFuture<?>> map = new ConcurrentHashMap<>();
// Always schedule a new future that prints "Hello" after a delay.
// If a future already exists for the passed in `key`, then `cancel` it.
map.compute(key, (k, future) -> {
if (future != null && !future.isDone()) {
future.cancel(true);
}
return executorService.schedule(() -> System.out.println("Hello"), 10, TimeUnit.SECONDS);
});
I have written code similar to this example many times. compute
is handy here because it allows you to atomically interact with the existing keyed value while providing a new mapping. In this case, it allows the existing future to be cancelled before creating a new one and holding it within the map.
compute
also lets you remove a mapping by returning null
. However, this means that compute
cannot be used to map a key to a null
value. For ConcurrentHashMap
though, this is a non-factor as the data structure explicitly disallows null
values. From the JavaDocs:
/**
* ...
* <p>Like {@link Hashtable} but unlike {@link HashMap}, this class
* does <em>not</em> allow {@code null} to be used as a key or value.
* ...
*/
Below is an example that removes a mapping using compute
:
// Setup for the example.
Map<Integer, Integer> map = new ConcurrentHashMap<>();
map.compute(key, (k, value) -> {
int newValue = value + input;
if (newValue > 100) {
return null;
}
return newValue;
});
This example removes from the map if the input
and existing value
surpass a specific number or updates the mapping to the new total.
Do Not Use get and put for Atomic Updates of Existing Mappings
ConcurrentHashMap
cannot ensure deterministic behaviour if you write code like the following:
// `Runnable` created because non-final variables can't be used in lambdas.
static class MyThread implements Runnable {
private int number;
private AtomicInteger sharedUpdateCounter;
private int maxSharedCount;
private Map<Integer, Integer> sharedMap;
public MyThread(
int number,
AtomicInteger sharedUpdateCounter,
int maxSharedCount,
Map<Integer, Integer> sharedMap
) {
this.number = number;
this.sharedUpdateCounter = sharedUpdateCounter;
this.maxSharedCount = maxSharedCount;
this.sharedMap = sharedMap;
}
@Override
public void run() {
// Keep updating until all the shared count is reached by the threads.
while (sharedUpdateCounter.getAndIncrement() < maxSharedCount) {
// Read and icrement the existing value.
int existingValue = sharedMap.get(1);
int newValue = existingValue + 1;
// Update with the incremented value.
sharedMap.put(1, newValue);
System.out.println("Thread [" + number + "] 1 -> " + newValue);
sleep(1000);
}
}
}
public static void main(String[] args) {
Map<Integer, Integer> map = new ConcurrentHashMap<>(Map.of(
1, 0
));
// Setup a shared count and max value to limit the number of writes to 12.
AtomicInteger j = new AtomicInteger(0);
int maxSharedCount = 12;
// Start threads.
List<Thread> threads = Stream.of(1, 2, 3).map(i -> {
Thread thread = new Thread(new MyThread(i, j, maxSharedCount, map));
thread.start();
return thread;
}).toList();
// Wait for the threads to finish.
threads.forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("Final value = " + map.get(1));
}
This code reads a key’s current value using get
and then updates it using put
, allows updates from separate threads to overwrite each other, leading to an indeterministic outcome.
Thread [2] 1 -> 2
Thread [3] 1 -> 3
Thread [1] 1 -> 1
Thread [2] 1 -> 4
Thread [3] 1 -> 4
Thread [1] 1 -> 4
Thread [1] 1 -> 5
Thread [3] 1 -> 5
Thread [2] 1 -> 5
Thread [2] 1 -> 7
Thread [3] 1 -> 7
Thread [1] 1 -> 6
Final value = 7
The output would be 12 if reads and writes occurred atomically.
Rewriting the example to use compute
instead of paired calls to get
and put
gives us the deterministic result that we desire:
@Override
public void run() {
while (sharedUpdateCounter.getAndIncrement() < maxSharedCount) {
int newValue = sharedMap.compute(1, (key, existingValue) -> existingValue + 1);
System.out.println("Thread [" + number + "] 1 -> " + newValue);
sleep(1000);
}
}
Outputs:
Thread [3] 1 -> 1
Thread [2] 1 -> 2
Thread [1] 1 -> 3
Thread [3] 1 -> 4
Thread [2] 1 -> 6
Thread [1] 1 -> 5
Thread [2] 1 -> 7
Thread [3] 1 -> 9
Thread [1] 1 -> 8
Thread [2] 1 -> 10
Thread [3] 1 -> 11
Thread [1] 1 -> 12
Final value = 12
The threads that perform each update will vary, but the final value is always consistent.
Summary
ConcurrentHashMap
allows concurrent reads while preventing simultaneous writes, ensuring that the map behaves deterministically. However, you must correctly use the writing methods to ensure your code behaves as expected. Relying on methods such as compute
and merge
, which allows you to atomically read an existing mapping and then act on it, is crucial; otherwise, the benefits of using ConcurrentHashMap
are void.
Published at DZone with permission of Dan Newton, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments