Message Throttling Implementation With Buffering
This guide describes how you can implement a throttling mechanism in Java, allowing you to limit the egress rate regardless of the ingress rate for a system.
Join the DZone community and get the full member experience.
Join For FreeIntroduction
Software engineers spend a great deal of their time improving the speed and throughput of their systems. Scalability is also a big concern nowadays, usually tackled by building scale-out capabilities. There are times, however, when we have to slow down the system's rate. It may be due to a limited resource that is very hard (or very expensive) to scale or even a dependency on a third-party service that imposes a fixed rate for billing purposes (i.e., speed tiers). How can you add such throttling capability to a scalable system that may span to hundreds of servers? Furthermore, how do you implement such a bottleneck with proper overflow handling so it can gracefully handle spikes without messages getting lost?
Problem Definition and Constraints
For the purposes of this article, we assume that there is a need to limit a message delivery rate because a downstream provider imposes such a limit. This provider can support higher rates but at an increased cost. Since the upstream clients only occasionally exceed this rate, there is no business justification for upgrading the speed tier. Also, let's assume that the provider will drop any messages arriving at a rate greater than the speed tier rate.
Our task is to:
- Throttle the message rate when our clients exceed the imposed limit (avoid messages being dropped).
- Queue any messages that would exceed the rate (delayed send).
- Use the maximum available rate at all times.
Implementation
Our clients sending the messages use a REST API implemented with Quarkus. This is a trivial JAX-RS endpoint:
xxxxxxxxxx
"/messages") (
public class MessageResource {
MessageService messageService;
MediaType.TEXT_PLAIN) (
MediaType.TEXT_PLAIN) (
public String submitMessage(final String message) {
messageService.submitMessage(message);
return "OK";
}
The MessageService class is an ApplicationScoped bean that delegates message submission to our Throttler class, which is where the magic happens.
xxxxxxxxxx
public class MessageService {
private Throttler<Serializable> throttler;
void setup() {
throttler = new Throttler.ThrottlerBuilder<>()
.withMsgsPerInterval(8)
.withIntervalUnits(1)
.withIntervalUnit(TimeUnit.SECONDS)
.withBuffer(new SimpleThrottlingBuffer<>())
.build();
throttler.start();
}
void stop() {
throttler.stop();
}
public void submitMessage(final String message) {
throttler.submit(message);
}
}
You can also see the throttler's configured rate of 8 messages per 1 second at lines 10-12.
Actual throttling happens in the Throttler class, which is essentially an implementation of the 'leaky bucket used as a queue' algorithm. While the full source is available at GitHub, let's focus on the most critical parts.
Constructor
xxxxxxxxxx
private Throttler(
final int msgsPerInterval,
final int intervalUnits,
final TimeUnit intervalUnit,
final ThrottlingBuffer<T> buffer) {
this.buffer = buffer;
semaphore = new Semaphore(msgsPerInterval);
// We always need a free executor thread for the refiller, hence the +1
ses = Executors.newScheduledThreadPool(msgsPerInterval + 1);
// Schedule the refill for the leaky bucket
ses.scheduleAtFixedRate(
refill(msgsPerInterval), intervalUnits, intervalUnits, intervalUnit
);
}
We use a counting Semaphore (line 8), which holds a fixed amount of permits, capped by the count of messages (msgsPerInterval) allowed by the imposed rate. When a message is set to go through the Throttler, it uses one such permit in order to be relayed; if the semaphore has no permits, the message has to wait for one. Then a new ScheduledThreadPoolExecutor is created with enough threads to handle concurrent messages at max rate, plus one for the scheduled method refill, which refills the semaphore at a fixed interval (lines 12-14).
Method submit(msg)
x
public void submit(T msg) {
buffer.add(msg);
}
This method essentially enqueues a message; the buffer (queue) is monitored by our class, as explained in the next section.
Method start()
xxxxxxxxxx
public void start() {
buffer.restoreState();
final Runnable worker = () -> {
System.out.println("Starting worker thread...");
while (true) {
final ThrottledEntity<T> next;
try {
next = buffer.take();
} catch (InterruptedException ex) {
LOG.log(Level.FINE, "Interrupted!", ex);
System.out.println("Interrupted, bailing");
return;
}
if (next != null) {
try {
semaphore.acquire();
} catch (InterruptedException ex) {
throw new RuntimeException("Failed to acquire semaphore", ex);
}
ses.execute(
() -> {
next.processed();
System.out.println("Processed msg: " + next.getEntity() + " after " + next.getQueueTime());
});
} else {
System.out.println("Queue is empty...");
}
}
};
workerThread = new Thread(worker, "Worker Thread");
workerThread.setDaemon(true);
workerThread.start();
}
This is the magic part. When the throttler is started, a new worker thread is spawned to monitor the queue and dispatch the buffered messages. Monitoring is performed by calling take()
at line 8, which is a blocking method waiting for an item to be available. When the buffer has messages, an attempt is made to acquire a permit from our semaphore (line 16). This is again a blocking call that waits for a permit to be available. When a permit is granted, the message is dispatched/handled at line 23. This is where the actual call to our downstream provider would be.
ThrottlingBuffer Interface
xxxxxxxxxx
public interface ThrottlingBuffer<T extends Serializable> {
boolean isEmpty();
ThrottledEntity<T> take() throws InterruptedException;
void add(T entity);
void saveState();
void restoreState();
}
This is the interface implemented by any class that can handle overflown (queued) messages for the Throttler. The most important methods are add and take (blocking). State management methods are supported to handle failures, i.e., allow the throttler to survive restarts without losing messages. A proper implementation would use a proper backend, such as Redis, but a simplistic implementation is provided in SimpleThrottlingBuffer.
Source code is available at GitHub. Refer to the README for instruction on demonstrating the solution.
Opinions expressed by DZone contributors are their own.
Comments