DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Implement a Distributed Database to Your Java Application
  • Best Performance Practices for Hibernate 5 and Spring Boot 2 (Part 4)
  • Moving PeopleSoft ERP Data Between Databases With Data Mover Scripts
  • Exploring the New Boolean Data Type in Oracle 23c AI

Trending

  • A Deep Dive Into Firmware Over the Air for IoT Devices
  • How to Configure and Customize the Go SDK for Azure Cosmos DB
  • Kubeflow: Driving Scalable and Intelligent Machine Learning Systems
  • Performing and Managing Incremental Backups Using pg_basebackup in PostgreSQL 17
  1. DZone
  2. Data Engineering
  3. Databases
  4. Implementing Locks in a Distributed Environment

Implementing Locks in a Distributed Environment

Getting started with lock implementation in a distributed environment, using services such as Kafka, Cassandra, Transformer, and Rules Engine.

By 
Prasanth Gullapalli user avatar
Prasanth Gullapalli
·
Dec. 07, 20 · Review
Likes (16)
Comment
Save
Tweet
Share
10.5K Views

Join the DZone community and get the full member experience.

Join For Free

As we know, locks are generally used to monitor and control access to shared resources by multiple threads simultaneously. They basically protect data integrity and atomicity in concurrent applications, i.e., only one thread at a time can acquire a lock on a shared resource, which otherwise is not accessible. But a lock in a distributed environment is more than just a mutex in a multi-threaded application. It is more complicated because the lock now has to be acquired across all the nodes, whereas any of the nodes in the cluster or the network can fail.

Here is the user story that we're going to consider to explain scenarios in the rest of this article. The application takes data in the user’s preferred format and converts it into a standardized format, like PDF, that can be uploaded to a government portal. There are two different micro-services of the application which do these things: Transformer and Rules Engine. We use Cassandra for persistence and Kafka as a message queue. Also, please note that the user request, once accepted, returns immediately. Once the PDF is generated, the user is notified about it asynchronously. This is achieved in a sequence of steps as follows:

  • The user request is put into a message queue.
  • Once the Transformer service picks up the user request, it transforms the user uploaded file into a format that Rules Engine can understand.
  • Now the data is taken through Rules Engine, which updates the data points.
  • Finally, the data is transformed into a PDF, and the user is notified.

Firstly, let us understand why we need to fetch locks at all in a distributed environment. The following are the use-cases we have used distributed locks for:

  • Efficiency: This is to make sure that the same expensive computation does not happen multiple times. For example: suppose the user has uploaded a file for processing. As there is a heavier load on the system due to an increase in requests or because the current file is too large to be processed, it might take a while to generate the PDF. If the user becomes restless waiting to be notified, he may upload the file again for processing (now adding more load to the system unnecessarily). This can be avoided by taking a lock on the checksum of the file before processing it.
  • Correctness: This is to avoid data corruption in the application. When we use locks, two concurrent/parallel processes do not mess up the underlying data. If two processes operate on the underlying data set simultaneously, without acquiring the lock, there is a high chance that the data might get corrupted. For example: Let's say we have got the sales transactions and the line item data from the user. The tax amount at the transaction level is calculated as the sum of tax already levied at the transaction level and any additional taxes at the line level. If rules are executed for the same transaction in 2 different nodes in parallel, there is a very good probability that the tax amount gets incremented twice for the line items. This can be avoided if we take a lock at the transaction level.

Please note that locks are often not seen as a good idea. The blocking operations increase the contention for the underlying resources, thereby limiting the system's computational capacity. Also, trying to lock in a distributed environment is supposed to be much more difficult and dangerous for the following reasons:

  • What happens to the lock when a node which has acquired it has crashed without releasing it?
  • How do we deal with cases of network partitioning?

These would bring in the additional dimension of consensus into the picture. We will get into the idea of distributed consensus in a while.

For all the above reasons, we should avoid these locks if any alternative solutions exist. Here are two possible approaches that can be used in the application:

  1. Optimistic Locking:  The resources are not actually locked in this case. We check if the resource is updated by someone else before committing the transaction. If the data is stale, the transaction will be rolled back, and an error is thrown to the user indicating that. In contrary to this, pessimistic locking is when you take an exclusive lock so that no one else can modify the resource. For example, select-for-update locks in databases or Java locks. Hibernate provides support for optimistic locking.
  2. Usage of partitions in Kafka: As mentioned earlier, we have always kept the user requests in Kafka before processing them. It is done this way because availability is one of the core architectural principles of application. We did not want the application to crash when the load increases multiple folds during peak usage periods. Kafka stores messages published for a topic into multiple partitions internally. Also, it guarantees that messages from a given partition are always served to consumers in the same order as they are published. Leveraging this information, we can publish all requests that we don’t want to process in parallel (and hence use locks) to the same partition. This can be done by specifying a partition-key while publishing the message to Kafka. Messages with the same key will be published to the same partition. Now that the messages are taken up sequentially from the partition, we don’t need locks anymore.

There might still be cases where we prefer to take a distributed lock as they do not fit into the above scenarios. Distributed consensus comes into the picture when we talk about distributed locks. Distributed consensus can be defined as the process of getting all nodes in a cluster to agree on some specific value based on their votes. All nodes must agree upon the same value, and it must be a value submitted by at least one of the nodes. When a particular node is said to acquire a distributed lock in a cluster, the rest of the nodes in the cluster have to agree that the lock has been taken up by it. There are multiple consensus algorithms like Paxos, Raft, ZAB, Pacifica, and so on. I have given some links to explain these algorithms towards the end of the blog for those interested in it. Here are the two most general ways of implementing the consensus systems:

  • Symmetric/leader-less: Here, all servers participating in the consensus have equal roles. So the client can connect to any one of the servers in this case. Example: Paxos.
  •  Asymmetric/leader-based: Here, at any given time, one server acts as the leader apart from those participating in the consensus. The rest of the servers accept the leader’s decisions. Here, clients can only communicate with the leader. Example: Raft, ZAB.

Lock Implementation

For decades, distributed consensus has become synonymous with Paxos. But now, there are different implementations, as discussed above. Raft overcomes some of the drawbacks of the traditional Paxos. For each of the algorithms mentioned above, there are different implementations. For example, Cassandra implemented Paxos for their lightweight transactions. Kafka internally uses Pacifica, whereas Zookeeper and Hazelcast use ZAB and Raft, respectively. Here is the generic interface of the distributed lock in our application:

Java
 




xxxxxxxxxx
1
50


 
1
package common.concurrent.lock;
2
 
3
import java.util.concurrent.TimeUnit;
4
 
5
/**
6
 * Provides interface for the distributed lock implementations based on Zookeeper and Hazelcast.
7
 * @author pgullapalli
8
 */
9
public interface DistributedLock {
10
    /**
11
     * Acquires the lock. If the lock is not available, the current thread until the lock has been acquired.
12
     * The distributed lock acquired by a thread has to be released by same thread only.
13
     **/
14
    void lock();
15
 
16
    /**
17
     * This is a non-blocking version of lock() method; it attempts to acquire the lock immediately, return true if locking succeeds.
18
     * The distributed lock acquired by a thread has to be released by same thread only.
19
     **/
20
    boolean tryLock();
21
 
22
    /**
23
     * Acquires the lock. Blocks until the lock is available or timeout is expired.
24
     * The distributed lock acquired by a thread has to be released by same thread only.
25
     **/
26
    boolean tryLock(long timeout, TimeUnit unit);
27
 
28
    /**
29
     * Checks if current thread has already acquire the lock.
30
     * @return
31
     */
32
    boolean isLocked();
33
 
34
    /**
35
     * Releases the lock. This method has to be called by same thread as which has acquired the lock.
36
     */
37
    void release();
38
}
39
 
40
public interface DistributedLocker {
41
 
42
    /**
43
     * This method only fetches the lock object but does not explicitly lock. Lock has to be acquired and released.
44
     * specifically
45
     * @param key Fetch the lock object based on the key provided.
46
     * @return Implementation of DistributedLock object
47
     */
48
    DistributedLock getLock(String key);
49
 
50
}


For our application, here are the options that we have explored for implementing distributed locks:

a) InterProcessSemaphoreMutex from Zookeeper: Curator open-sourced by Netflix, a high-level API built on top of Zookeeper, which provides many recipes, and handles the complexity of managing connections and retrying operations to the underlying ZooKeeper ensemble. InterProcessSemaphoreMutex, a recipe from Curator Framework, is a re-entrant mutex that works across JVMs. It uses Zookeeper to hold the lock. All processes across JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is “fair” – each user will get the mutex in the order requested (from Zookeeper’s point of view).

Java
 




xxxxxxxxxx
1
73


 
1
package common.concurrent.lock.impl;
2
 
3
import common.concurrent.lock.DistributedLock;
4
import common.concurrent.lock.DistributedLocker;
5
import org.apache.curator.framework.CuratorFramework;
6
import org.apache.curator.framework.CuratorFrameworkFactory;
7
import org.apache.curator.framework.recipes.locks.InterProcessLock;
8
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
9
import org.apache.curator.retry.ExponentialBackoffRetry;
10
 
11
import java.util.concurrent.TimeUnit;
12
 
13
public class ZKBasedDistributedLocker implements DistributedLocker {
14
    private final CuratorFramework curatorClient;
15
    private final String basePath;
16
 
17
    public ZKBasedDistributedLocker(){
18
        curatorClient = CuratorFrameworkFactory.newClient("localhost:2181",
19
                new ExponentialBackoffRetry(1000, 3));
20
        basePath = new StringBuilder("/config/sample-app/distributed-locks/").toString();
21
    }
22
 
23
    @Override
24
    public DistributedLock getLock(String key) {
25
        String lock = new StringBuilder(basePath).append(key).toString();
26
        return new ZKLock(new InterProcessSemaphoreMutex(curatorClient, lock));
27
    }
28
 
29
    private class ZKLock implements DistributedLock {
30
        private final InterProcessLock lock;
31
 
32
        public ZKLock(InterProcessLock lock){
33
            this.lock = lock;
34
        }
35
 
36
        @Override
37
        public void lock() {
38
            try {
39
                lock.acquire();
40
            } catch (Exception e) {
41
                throw new RuntimeException("Error while acquiring lock", e);
42
            }
43
        }
44
 
45
        @Override
46
        public boolean tryLock() {
47
            return tryLock(10, TimeUnit.MILLISECONDS);
48
        }
49
 
50
        @Override
51
        public boolean tryLock(long timeout, TimeUnit unit) {
52
            try {
53
                return lock.acquire(timeout, unit);
54
            } catch (Exception e) {
55
                throw new RuntimeException("Error while acquiring lock", e);
56
            }
57
        }
58
 
59
        @Override
60
        public boolean isLocked() {
61
            return lock.isAcquiredInThisProcess();
62
        }
63
 
64
        @Override
65
        public void release() {
66
            try {
67
                lock.release();
68
            } catch (Exception e) {
69
                throw new RuntimeException("Error while releasing lock", e);
70
            }
71
        }
72
    }
73
}


As Zookeeper is commonly used in many distributed systems, using this option does not need any additional frameworks for locking. One observation, however, is that the performance degraded as the number of locks increased. This is because all the locks are actually created as znodes internally. As the number of znodes started increasing, we even started facing problems while listing/deleting the locks folder in Zookeeper. So for cases where we take fewer locks, Zookeeper is a good fit. As many services of an application might depend on Zookeeper, any problem with Zookeeper might impact them. Few such use-cases are Microservices registering themselves for Service Discovery, Services using Kafka, which depends on Zookeeper, for leader election.

b) Lightweight Transactions from Cassandra: It is easy to achieve strong consistency in master based distributed systems. However, it also means that there is a compromise on the system's availability if the master is down. Cassandra is a master-less system and trades-off availability over consistency. It falls under the AP category of the CAP theorem, and hence is highly available and eventually consistent by default. Eventually consistent implies the read-after-write of a value may not yield the latest value written. But, we can achieve strong consistency in Cassandra by specifying the consistency level for the query as QUORUM. This quorum means that a write transaction would succeed only after writing it to a majority of servers. We can implement a lock in Cassandra as follows:

  1. create table lock_requests(resource_id text,lock_status text, created_on timestamp, primary key(resource_id));
  2. Thread which tries to acquire a lock checks if there exists an entry in locks table with specified key: select * from lock_requests where resource_id = ‘ABC’;
  3. If lock does not exist, now we say that the lock is acquired after inserting an entry into locks: insert into lock_requests(resource_id,lock_status,created_on) values(‘ABC’, ‘Locked’, toTimestamp(now()))

Please note that there is always a possibility of a race condition among threads between steps 2 and 3 if we do these as separate steps from the application. But if the database itself can check for row existence before insertion, the race condition can be avoided. This is referred to as linearizable consistency (i.e., Serial isolation level in ACID terms). Lightweight transactions do the same. So here is how steps 2 and 3 above can be combined:

SQL
 




x


 
1
insert into lock_requests(resource_id,lock_status,created_on) values('ABC', 'Locked', toTimestamp(now())) if not exists;


If the lock exists, the above write fails, and hence the lock is not acquired. The next problem is what happens if the service that acquired the lock has not released it. The server might have crashed, or the code might have thrown an exception. The lock will never get released. For such cases, we can define time-to-live (TTL) for the row. This means the lock row will automatically expire after the prescribed number of seconds. Here is how we can achieve it by defining TTL for every record of the row:

SQL
 




xxxxxxxxxx
1


 
1
create table lock_requests(resource_id text,lock_status text, created_on timestamp, primary key(resource_id)) with gc_grace_seconds=86400 and default_time_to_live=600;


Now the lock will automatically expire in 10 mins. This setting can be overridden for every row by defining TTL for all the columns. TTL might not help if we don’t have a rough estimate of how much time a computation (that is surrounded by the lock) can take. 

Java
 




xxxxxxxxxx
1
98


 
1
package common.concurrent.lock.impl;
2
 
3
import com.datastax.oss.driver.api.core.CqlSession;
4
import com.datastax.oss.driver.api.core.cql.BoundStatement;
5
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
6
import com.datastax.oss.driver.api.core.cql.ResultSet;
7
import com.datastax.oss.driver.api.core.cql.Row;
8
import common.concurrent.lock.DistributedLock;
9
import common.concurrent.lock.DistributedLocker;
10
import org.apache.commons.lang3.time.StopWatch;
11
 
12
import java.net.InetSocketAddress;
13
import java.time.Instant;
14
import java.util.concurrent.TimeUnit;
15
 
16
public class CassandraDistributedLocker implements DistributedLocker {
17
    private final CqlSession session;
18
    private final PreparedStatement selectStatement, insertStatement, deleteStatement;
19
 
20
    public CassandraDistributedLocker(){
21
        session = CqlSession.builder()
22
                .addContactPoint(new InetSocketAddress("127.0.0.1", 9042))
23
                .withKeyspace("sample").build();
24
        selectStatement = session.prepare(
25
                "select * from lock_requests where resource_id=?");
26
        insertStatement = session.prepare(
27
                "insert into lock_requests(resource_id,lock_status,created_on) values(?,?,?) if not exists");
28
        deleteStatement = session.prepare(
29
                "delete from lock_requests where resource_id=? if exists");
30
    }
31
 
32
    @Override
33
    public DistributedLock getLock(String key) {
34
        return new CassandraLock(key);
35
    }
36
 
37
    private class CassandraLock implements DistributedLock{
38
        private final String key;
39
 
40
        public CassandraLock(String key) {
41
            this.key = key;
42
        }
43
 
44
        @Override
45
        public void lock() {
46
            insertLock();
47
        }
48
 
49
        private boolean insertLock() {
50
            BoundStatement boundStatement = insertStatement.bind()
51
                    .setString(0, key)
52
                    .setString(1, "LOCKED")
53
                    .setInstant(2, Instant.now());
54
            ResultSet resultSet = session.execute(boundStatement);
55
            return resultSet.wasApplied();// this is equivalent to row.getBool("applied")
56
        }
57
 
58
        @Override
59
        public boolean tryLock() {
60
            return tryLock(10, TimeUnit.MILLISECONDS);
61
        }
62
 
63
        @Override
64
        public boolean tryLock(long timeout, TimeUnit unit) {
65
            try {
66
                boolean locked = false;
67
                StopWatch stopWatch = StopWatch.createStarted();
68
                while(stopWatch.getTime(TimeUnit.SECONDS) < timeout) {
69
                    if(insertLock()) {
70
                        locked = true;
71
                        break;
72
                    }
73
                }
74
                return locked;
75
            } catch (Exception e) {
76
                throw new RuntimeException("Error while acquiring lock", e);
77
            }
78
        }
79
 
80
        @Override
81
        public boolean isLocked() {
82
            BoundStatement boundStatement = selectStatement.bind().setString(0, key);
83
            ResultSet resultSet = session.execute(boundStatement);
84
            Row row = resultSet.one();
85
            return row != null ? "LOCKED".equals(row.getString("lock_status")) : false;
86
        }
87
 
88
        @Override
89
        public void release() {
90
            try {
91
                BoundStatement boundStatement = deleteStatement.bind().setString(0, key);
92
                session.execute(boundStatement);
93
            } catch (Exception e){
94
                throw new RuntimeException("Error while releasing lock", e);
95
            }
96
        }
97
    }
98
}


Cassandra internally uses a modified version of Paxos for lightweight transactions implementation. It does 4 extra round-trips to achieve this linearizability. That sounds like a high cost – perhaps too high if you have the rare case of an application that requires every operation to be linearizable. But for most applications, only a tiny minority of operations require linearizability, which is a good tool to add to the strong/eventual consistency we’ve provided so far.

Of course, this solution is viable only if the application is already using Cassandra for persistence. We have also seen LWTs timing out under heavy loads, so it is better to exercise these locks with caution. One good thing about these locks is that no constraint exists that the lock has to be released by the one who acquired it. This might come in handy if we have such scenarios where one microservice takes a lock initially, and the other service releases it after the workflow completion asynchronously.

c) Distributed locks with Hazelcast: Hazelcast IMDG provides distributed versions of fundamental Java collections and synchronizers. The beauty of the Hazelcast API is that they are pretty simple to understand as they implement Java API itself. For Ex: com.hazelcast.map.IMap extends java.util.Map. So there is a lesser learning curve here. The distributed map implementation has a method to lock a specific key. If the lock is not available, the current thread is blocked until the lock has been released. We can get a lock on the key even if it is not present in the map. If the key does not exist in the map, any thread apart from the lock owner will get blocked if it tries to put the locked key in the map.

Java
 




xxxxxxxxxx
1
64


 
1
package common.concurrent.lock.impl;
2
 
3
import com.hazelcast.core.Hazelcast;
4
import com.hazelcast.core.HazelcastInstance;
5
import com.hazelcast.core.IMap;
6
import common.concurrent.lock.DistributedLock;
7
import common.concurrent.lock.DistributedLocker;
8
 
9
import java.util.concurrent.TimeUnit;
10
 
11
public class HzMapBasedDistributedLocker implements DistributedLocker {
12
    private IMap txLockMap;
13
 
14
    public HzMapBasedDistributedLocker(){
15
        HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
16
        txLockMap = hazelcastInstance.getMap("txLockMap");
17
    }
18
 
19
    @Override
20
    public DistributedLock getLock(String lockKey) {
21
        return new HzMapBasedLock(lockKey);
22
    }
23
 
24
    private class HzMapBasedLock implements DistributedLock{
25
        private final String key;
26
 
27
        public HzMapBasedLock(String key) {
28
            this.key = key;
29
        }
30
 
31
        @Override
32
        public void lock() {
33
            txLockMap.lock(key);
34
        }
35
 
36
        @Override
37
        public boolean tryLock() {
38
            return txLockMap.tryLock(key);
39
        }
40
 
41
        @Override
42
        public boolean tryLock(long timeout, TimeUnit unit) {
43
            try {
44
                return txLockMap.tryLock(key, timeout, unit);
45
            } catch (Exception e) {
46
                throw new RuntimeException("Error while acquiring lock", e);
47
            }
48
        }
49
 
50
        @Override
51
        public boolean isLocked() {
52
            return txLockMap.isLocked(key);
53
        }
54
 
55
        @Override
56
        public void release() {
57
            try {
58
                txLockMap.unlock(key);
59
            } catch (Exception e){
60
                throw new RuntimeException("Error while releasing lock", e);
61
            }
62
        }
63
    }
64
}


Please note that Hazelcast IMDG implementation too falls under the AP category of the CAP system. However, strong consistency (even in failure/exceptional cases) is a fundamental requirement for any tasks that require distributed coordination. Hence, there are cases where the existing locks based on map implementation will fail. To address these issues, Hazelcast later came up with the CPSubsystem implementation. CPSubsystem has got a new distributed lock implementation on top of Raft consensus. The CPSubsystem lives alongside AP data structures of the Hazelcast IMDG cluster. CPSubsystem maintains linearizability in all cases, including client and server failures, network partitions, and prevent split-brain situations. In fact, Hazelcast claims that they are the one and only solution which offers a linearizable and distributed lock implementation. 

Java
 




x
1
72


 
1
package common.concurrent.lock.impl;
2
 
3
import com.hazelcast.config.Config;
4
import com.hazelcast.core.Hazelcast;
5
import com.hazelcast.core.HazelcastInstance;
6
import com.hazelcast.cp.lock.FencedLock;
7
import common.concurrent.lock.DistributedLock;
8
import common.concurrent.lock.DistributedLocker;
9
 
10
import java.util.concurrent.TimeUnit;
11
 
12
public class HzLockBasedDistributedLocker implements DistributedLocker {
13
    private HazelcastInstance hazelcastInstance;
14
 
15
    public HzLockBasedDistributedLocker(int cpMemberCount){
16
        Config config = new Config();
17
        config.getCPSubsystemConfig().setCPMemberCount(3);
18
        config.getCPSubsystemConfig().setGroupSize(3);
19
        hazelcastInstance = Hazelcast.newHazelcastInstance(config);
20
    }
21
 
22
    @Override
23
    public DistributedLock getLock(String key) {
24
        return wrapHzLock(key);
25
    }
26
 
27
    private DistributedLock wrapHzLock(String key){
28
        return new HzLock(key);
29
    }
30
 
31
    private class HzLock implements DistributedLock {
32
        private final FencedLock lock;
33
 
34
        public HzLock(String key) {
35
            this.lock = hazelcastInstance.getCPSubsystem().getLock(key);
36
        }
37
 
38
        @Override
39
        public void lock() {
40
            lock.lock();
41
        }
42
 
43
        @Override
44
        public boolean tryLock() {
45
            return lock.tryLock();
46
        }
47
 
48
        @Override
49
        public boolean tryLock(long timeout, TimeUnit unit) {
50
            try {
51
                return lock.tryLock(timeout, unit);
52
            } catch (Exception e) {
53
                throw new RuntimeException("Error while acquiring lock", e);
54
            }
55
        }
56
 
57
        @Override
58
        public boolean isLocked() {
59
            return lock.isLocked();
60
        }
61
 
62
        @Override
63
        public void release() {
64
            try {
65
                lock.unlock();
66
                //((DistributedObject) lock).destroy();
67
            } catch (Exception e){
68
                throw new RuntimeException("Error while releasing lock", e);
69
            }
70
        }
71
    }
72
}


The above code looks pretty clean and simple. The problem is that locks never expire on their own in Hazelcast unless they are explicitly destroyed. If not destroyed and are often created, we may end up with out-of-memory exceptions over a period of time. The following from Hazelcast documentation clarifies the same: 

Plain Text
 




xxxxxxxxxx
1


 
1
Locks are not automatically removed. If a lock is not used anymore, Hazelcast does not automatically perform garbage collection in the lock. This can lead to an OutOfMemoryError. If you create locks on the fly, make sure they are destroyed. 


Although the fix looks trivial, i.e., uncomment the destroy line in the above code, the problem here is that a lock once destroyed can not be recreated in the same CP Group unless restarted. So, if you need to reuse the locks that were once released, we can not destroy them. In such cases, it is better to use map-based implementation itself. Based on the specific use-case, one can go with either of the implementations. Hazelcast may address the issue soon.

There are other frameworks like Redis, which offer distributed locks, that I have not explained here. I have listed them in the resources section. One final point to keep in mind is that it is always better to use these locks with caution. If any alternate solution does not require locks, it is better to go with that.

Additional Resources

  1. Implementing Replicated Logs with Paxos
  2. Raft: A Consensus Algorithm for Replicated Logs
  3. Zab vs. Paxos
  4. Lightweight Transactions in Cassandra 2.0
  5. Architecture of ZAB – ZooKeeper Atomic Broadcast Protocol
  6. Distributed Locks with Redis
  7. Distributed Locks are Dead; Long Live Distributed Locks!
Lock (computer science) Database kafka application Time to live sql Data integrity Implementation Hazelcast AI

Published at DZone with permission of Prasanth Gullapalli. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Implement a Distributed Database to Your Java Application
  • Best Performance Practices for Hibernate 5 and Spring Boot 2 (Part 4)
  • Moving PeopleSoft ERP Data Between Databases With Data Mover Scripts
  • Exploring the New Boolean Data Type in Oracle 23c AI

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!