Message Throttling Implementation With Buffering
This guide describes how you can implement a throttling mechanism in Java, allowing you to limit the egress rate regardless of the ingress rate for a system.
Join the DZone community and get the full member experience.Join For Free
Software engineers spend a great deal of their time improving the speed and throughput of their systems. Scalability is also a big concern nowadays, usually tackled by building scale-out capabilities. There are times, however, when we have to slow down the system's rate. It may be due to a limited resource that is very hard (or very expensive) to scale or even a dependency on a third-party service that imposes a fixed rate for billing purposes (i.e., speed tiers). How can you add such throttling capability to a scalable system that may span to hundreds of servers? Furthermore, how do you implement such a bottleneck with proper overflow handling so it can gracefully handle spikes without messages getting lost?
Problem Definition and Constraints
For the purposes of this article, we assume that there is a need to limit a message delivery rate because a downstream provider imposes such a limit. This provider can support higher rates but at an increased cost. Since the upstream clients only occasionally exceed this rate, there is no business justification for upgrading the speed tier. Also, let's assume that the provider will drop any messages arriving at a rate greater than the speed tier rate.
Our task is to:
- Throttle the message rate when our clients exceed the imposed limit (avoid messages being dropped).
- Queue any messages that would exceed the rate (delayed send).
- Use the maximum available rate at all times.
Our clients sending the messages use a REST API implemented with Quarkus. This is a trivial JAX-RS endpoint:
You can also see the throttler's configured rate of 8 messages per 1 second at lines 10-12.
Actual throttling happens in the Throttler class, which is essentially an implementation of the 'leaky bucket used as a queue' algorithm. While the full source is available at GitHub, let's focus on the most critical parts.
We use a counting Semaphore (line 8), which holds a fixed amount of permits, capped by the count of messages (msgsPerInterval) allowed by the imposed rate. When a message is set to go through the Throttler, it uses one such permit in order to be relayed; if the semaphore has no permits, the message has to wait for one. Then a new ScheduledThreadPoolExecutor is created with enough threads to handle concurrent messages at max rate, plus one for the scheduled method refill, which refills the semaphore at a fixed interval (lines 12-14).
This method essentially enqueues a message; the buffer (queue) is monitored by our class, as explained in the next section.
This is the magic part. When the throttler is started, a new worker thread is spawned to monitor the queue and dispatch the buffered messages. Monitoring is performed by calling
take()at line 8, which is a blocking method waiting for an item to be available. When the buffer has messages, an attempt is made to acquire a permit from our semaphore (line 16). This is again a blocking call that waits for a permit to be available. When a permit is granted, the message is dispatched/handled at line 23. This is where the actual call to our downstream provider would be.
This is the interface implemented by any class that can handle overflown (queued) messages for the Throttler. The most important methods are add and take (blocking). State management methods are supported to handle failures, i.e., allow the throttler to survive restarts without losing messages. A proper implementation would use a proper backend, such as Redis, but a simplistic implementation is provided in SimpleThrottlingBuffer.
Source code is available at GitHub. Refer to the README for instruction on demonstrating the solution.
Opinions expressed by DZone contributors are their own.