Distributed Scheduling and Servicing in Java
Schedulers are great for task execution, but they're tough to scale horizontally. Let's use Hazelcast's IMaps, Akka, and CRON expressions to make a more scalable system.
Join the DZone community and get the full member experience.
Join For FreeThe usage of task schedulers is a pertinent necessity in enterprise solutions. There are numerous articles on the Internet discussing the usability of such services/libraries, so we won't explore that any further.
Instead, we will try to address a recurring issue in scheduler-based solution design.
A scheduler is inherently a singleton-ized component of a system. To make it simple, if there is a uniqueness commitment attached with a scheduled execution, we always need to make sure one and only one scheduled task should run at a time, in order to avoid duplication.
The Problem: Scalability, SPoF
Now, we run a schedule to execute some task. So the effect of triggering a schedule is performing some action. Since, as mentioned above, we need to maintain a single scheduler run technique, it necessarily becomes a single execution run.
And herein lies the problem. Due to this fact, scheduler solutions tend to be difficult (or impossible) to scale horizontally or avoid having single points of failure.
Tearing a Scheduler Apart
If we look closely at a scheduler, it has, basically, two responsibilities:
Trigger next schedule
Execute task action
No. 1, is a time-dependent event, and somehow, this has to be done in a singleton-ized way. However, can we scale out the design of 2 into a distributed parallel processing? Of course we can, and that can be the solution.
Well, this sounds pretty simple and it will be, but we have to design such a platform that will be capable of:
Clustering to form a group of schedulers.
Trigger a unique schedule across the cluster.
Execute task actions parallelly.
Scale out task actions as new members join the group.
Avoid having any single point of failure.
A Prototype for the Platform
To corroborate the concept, I have created a prototype project built on Hazelcast, Spring scheduling, and Akka actors. The source code is available on GitHub.
Design Notes
Clustering and distribution are built on top of the Hazelcast in-memory datagrid.
Scheduling capability is extended from the Spring scheduler.
Execution threads are handled using Akka actors.
The complete design is reactive in approach.
No distributed clock synchronization technique is used. This necessitates the datacenter be in the same timezone. Appropriate handling of instance clock lagging is present, however, up to a certain limit. Check the code documentation for more information.
Distributed schedule synchronization is achieved using an atomic putIfAbsent operation on the Hazelcast-distributed IMap. This uses a timestamp patterned key, to implement an effective distributed lock acquire/release semantic, in order to trigger a unique scheduled run. As a consequence, it is mandatory that schedules are set in CRON expressions only. That means scheduleWithFixedDelay/Period type of recurrences are not supported.
A distributed message processing capability is also introduced on top of the Hazelcast IMap. This capability can be leveraged by schedule executors to emit data for parallel processing across the cluster.
Sample scheduler code:
public class MyScheduledTask extends DistributedScheduledTask {
public static final String someKey = "SOME_KEY";
public MyScheduledTask(String cronExpr) {
super();
this.cronExpr = cronExpr;
}
private String cronExpr;
@Override
public void run(TaskContext context) {
String valu = UUID.randomUUID().toString();
if (!context.containsKeyTransient(someKey)) {
context.setTransient(someKey, valu);
}
Data d = new TextData(valu, "TEST-QUEUE");
//don't do heavy stuff. rather emit a trigger to be processed
//across the cluster to the destination TEST-QUEUE
context.emit(d);
}
@Override
public String cronExpression() {
return cronExpr;
}
@Override
protected TimeUnit scheduleTimeunit() {
return TimeUnit.SECONDS;
}
}
Sample processor code:
public class SimpleQueueListener extends AbstractQueueListener < TextData > {
public SimpleQueueListener() {}
@Override
public Class < TextData > dataType() {
return TextData.class;
}
@Override
public void onMessage(TextData m) throws Exception {
log.info("Recieved message ... " + m.getPayload());
//Now do your stuff here. This will run on an actor thread
//leveraging the Akka actor hieararchy model
}
public int parallelism() {
return 20;
}
@Override
public String routing() {
return "TEST-QUEUE";
}
}
Opinions expressed by DZone contributors are their own.
Comments