Horizontal scaling of RDBMS with JTA and JEPLayer
Join the DZone community and get the full member experience.
Join For FreeThese days we are used to hearing how good NoSQL databases are for horizontally scaling, right,? You are lucky, because the need of scaling is a sign of a
successful service. But before crossing the Rubicon, think twice: try
before-hand to exhaust the horizontal scaling capabilities of your preferred
RDBMS.
I’m not going to discuss how much NoSQL databases scale up, since everybody knows that RDBMSs have been with us for many years with many years still to come. There are many reasons for them, one of them is the richness of the query language, which is very hard to beat.
The first typical option for scaling with only one RDBMS instance is adding some kind of in-memory cache. This option may be fine, but as you can easily figure out, an in-memory cache is the poor man's version of a RDBMS. Furthermore, it is the poor man's version of an in-memory RDMBS. An in-memory cache is not going to help very much when you perform any kind of SQL query beyond the typical primary key-based kind, and a cache does not save you from working hard to keep both environments, RDBMS and cache, in sync. For instance, think about the problem of rollbacking a transaction, the same kind of rollback must be performed in the cache unless the same transactional behavior is supported on the cache.
This kind of stuff has made me think about the convenience of replacing an in-memory cache with... a RDBMS. RDBMSs try to catch as much as possible in memory, of course they are much slower than an in-memory cache because write operations are saved to disk, but you appreciate its value when any SQL query gets cached for free and transactions work fine with no hassles!
The previous reasoning is absurd when it comes to comparing the typical in-memory cache/RDBMS with a direct access to a single RDBMS. Of course the in-memory/RDBMS performs better, regardless how much the RDBMS caches. But I’m talking about ACID horizontally scaling, that is, running your application on several nodes. This is the situation when using an in-memory cache becomes a problem if you want to get all of the nodes in sync while avoiding any kind of inconsistency (I’m not talking about the typical constant data that's never going to be changed). For instance, think about the problem of getting your in-memory cache in sync when a multi-row transaction rollback happens.
Using your RDBMS as a cache
Yes, my proposal is using a RDBMS per application-server/node like a sort of sophisticated caching. If you don’t have problems with database size, you could maintain several RDBMSs with the same data with no loss of relational capabilities (in spite of this approach could support some kind of sharding).
Am I a fool? Yes...maybe...but we are talking about horizontal scaling :)
Before you raise your hand up to ask about the problem of writing to several RBMSs (nodes) to get all data in sync, I am aware of this problem and I’m sure you agree with me that in a typical application:
- Reads are overwhelming more frequently than writes
- Written data is orders of magnitude smaller than read data
So there is not a serious problem writing on several distant databases and reading ever on the nearest (local) RDBMS.
Write operations take linearly more time when more nodes are added to the cloud (if writing operations happen sequentially, asynchronous writing may be an option when supported), but at the same time your throughput reading data increases linearly because read operations happen locally (every node reads data in the local RDBMS). These days, with multiversion RDBMs, read operations have almost no blocking. Of course write operations can become too expensive when you add many nodes, though it depends of your requirements, and it is up to you to decide when this kind of scaling reaches the unacceptable performance level for writing data. I’m talking about horizontal scaling RDBMs ever in sync using transactions with no loss of relational capabilities, that is, the same kind of ACID guaranties of one RDBMS but with several RDBMS instances.
Of course to get ACID behavior on writing to several databases you need JTA, and JTA provides two things:
- atomic operations on several databases used in the same node
- distributed transactions
We only need to use the first one.
This approach is just a manual alternative to the synchronous automatic replication offered by some databases, it is another synchronous option if you want to fully control the scalability problem or when there is no synchronous replication option in your RDBMS (like in MySQL). You will be hard pressed to find literature about this manual approach, and that is one motivations of this article.
Some code
JEPLayer is a low level ORM created to save developers from the typical tedious JDBC and transactional demarcation tasks. It has a special focus on transactions including JTA transactions with the same semantic as JavaEE. JEPLayer provides unique support of JTA on multiple databases (DataSources).
The following code excerpt is taken from JEPLayer 1.0.1, source code distribution, class test.scaling.TestScalingJTA. This example is a simulation of many heavily concurrent requests executing randomly selects, inserts and deletes. Select queries are executed in the same database (the local database) and inserts and deletes are executed into the same local JTA transaction, randomly simulating exceptions to force coordinated rollbacks by the JTA provider. Tested JOTM and Atomikos providers of standalone JTA, JTA in GlassFish is reported to work with JEPLayer but it was not tested in this example.
You need to set up several databases (this example is using MySQL) in different nodes and execute this code in every node in the same time to simulate heavy concurrency on several nodes.
public void test(final TestScalingConf conf,final JEPLJTAMultipleDataSource jdsMgr,final PersonDAOScalingTest[] personDaoArr) throws Exception { final int[] inserted = new int[1]; final int[] deleted = new int[1]; final int[] select = new int[1]; int numberOfThreads = conf.getNumberOfThreads(); Random randRoot = new Random(); final Random[] randArr = new Random[numberOfThreads]; for(int i = 0; i < numberOfThreads; i++) randArr[i] = new Random(randRoot.nextLong()); Thread[] threadArray = new Thread[numberOfThreads]; final boolean[] run = new boolean[]{false}; for(int i = 0; i < threadArray.length; i++) { final int threadNumber = i; Thread thread = new Thread() { @Override public void run() { while(!run[0]) Thread.yield(); try { executeActionsByThread(conf,jdsMgr, personDaoArr, randArr[threadNumber],inserted,deleted,select); } catch (Exception ex) { throw new RuntimeException(ex); } } }; thread.start(); threadArray[i] = thread; } long start = System.currentTimeMillis(); run[0] = true; for(int i = 0; i < threadArray.length; i++) threadArray[i].join(); long end = System.currentTimeMillis(); long lapse = end - start; System.out.println("LAPSE: " + lapse); System.out.println("INSERTED: " + inserted[0] + ", per second: " + (1000.0*inserted[0]/lapse)); System.out.println("DELETED: " + deleted[0] + ", per second: " + (1000.0*deleted[0]/lapse)); System.out.println("SELECTS: " + select[0] + ", per second: " + (1000.0*select[0]/lapse)); }
public void executeActionsByThread(TestScalingConf conf,final JEPLJTAMultipleDataSource jdsMgr,final PersonDAOScalingTest[] personDaoArr,final Random rand, final int[] inserted,final int[] deleted,final int[] select) throws Exception { int loopsPerRepetition = conf.getNumberOfLoopsEveryRepetition(); final int masterDataSourceIndex = TestScalingJTAShared.getMasterDataSourceIndex(conf,personDaoArr); final int closerDataSourceIndex = TestScalingJTAShared.getCloserDataSourceIndex(conf,personDaoArr); int ratioSelectChange = conf.getRatioSelectChange(); int ratioInsertDelete = conf.getRatioInsertDelete(); final boolean testRollback = conf.getTestRollback(); for(int loop = 0; loop < loopsPerRepetition; loop++) { int rndNum = rand.nextInt(ratioSelectChange); if (rndNum == 0) { int rndNumIns = rand.nextInt(ratioInsertDelete); if (rndNumIns == 0) { JEPLTask task = new JEPLTask() { @JEPLTransactionalJTA(propagation=JEPLTransactionPropagation.REQUIRED) public Object exec() throws Exception { int index = rand.nextInt(personDaoArr.length); PersonDAOScalingTest dao = personDaoArr[index]; List<Person> list = dao.selectRangeOrderByIdDesc(0,1); if (list.size() > 0) { Person person = list.get(0); TestScalingJTAShared.deletePerson(masterDataSourceIndex,person,personDaoArr,testRollback,rand); deleted[0]++; } return null; } }; try { jdsMgr.exec(task); } catch(JEPLException ex) { if (ex.getCause() == null || !ex.getCause().getMessage().startsWith("FALSE ERROR")) throw new RuntimeException("Unexpected",ex); else System.out.println("EXPECTED ROLLBACK (DELETE)"); } } else { JEPLTask task = new JEPLTask() { @JEPLTransactionalJTA(propagation=JEPLTransactionPropagation.REQUIRED) public Object exec() throws Exception { TestScalingJTAShared.insertPerson(masterDataSourceIndex,personDaoArr,testRollback,rand); inserted[0]++; return null; } }; try { jdsMgr.exec(task); } catch(JEPLException ex) { if (ex.getCause() == null || !ex.getCause().getMessage().startsWith("FALSE ERROR")) throw new RuntimeException("Unexpected",ex); else System.out.println("EXPECTED ROLLBACK (INSERT)"); } } } else { JEPLTask task = new JEPLTask() { @JEPLTransactionalJTA(propagation=JEPLTransactionPropagation.NOT_SUPPORTED) public Object exec() throws Exception { PersonDAOScalingTest dao = personDaoArr[closerDataSourceIndex]; dao.selectRangeOrderByIdDesc(0,50); select[0]++; return null; } }; jdsMgr.exec(task); } } }The following method of TestScalingJTAShared shows how we insert the same data into all databases, and how insertions are executed sequentially. In this example there is a database “master” and insertion on this database is a bit different because the primary key is generated in this database. This is because automatic primary key generation of MySQL is not transactional in MySQL, so if you generate your own primary key, there is no master and the insertion code will be the shame. Anyway order of databases when insertion must be the same in all nodes to avoid dead-locks.
public static Person insertPerson(int masterDSIndex,PersonDAOScalingTest[] personDaoArr,boolean testRollback,Random rand) { Person person = new Person(); person.setName("A Person object"); person.setPhone("1111111"); person.setEmail("hello@world.com"); person.setAge(20); PersonDAOScalingTest dao = personDaoArr[masterDSIndex]; dao.insertKeyGenerated(person); for(int i = 0; i < personDaoArr.length ; i++) { if (i == masterDSIndex) continue; if (testRollback && rand.nextInt(3) == 0) throw new RuntimeException("FALSE ERROR INSERT"); PersonDAOScalingTest currDao = personDaoArr[i]; currDao.insertKeyNotGenerated(person); } return person; }
The result is clear: ACID compliant database cluster, linear increase of throughput in selects, linear degradation of writes, and if the relation of selects and writes is high you will get an overall performance and scalability gain.
Have you tried something similar for horizontally scaling your RDBMS?
How do you horizontally scale up?
Opinions expressed by DZone contributors are their own.
Comments