DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Building a Reusable Framework to Standardize API Ingestion in an On-Prem Lakehouse
  • The "Zombie API" Attack: Why Your Old Integrations Are Your Biggest Security Risk
  • AI Agents Expose a Design Gap in Microservices Resilience Architecture
  • Navigating the Complexities of AI-Driven Integration in Multi-Cloud Environments: A Veteran’s Insights

Trending

  • Architecting an Embedded Efficiency Layer: A Platform Deep Dive into Day-Two Operational Tuning
  • Java in a Container: Efficient Development and Deployment With Docker
  • Integrating AI-Driven Decision-Making in Agile Frameworks: A Deep Dive into Real-World Applications and Challenges
  • Product-Led Software Delivery: Intelligent Platforms for DevOps at Scale
  1. DZone
  2. Software Design and Architecture
  3. Integration
  4. Bucket4j + Infinispan: A Deep Dive Into Implementation

Bucket4j + Infinispan: A Deep Dive Into Implementation

This deep dive explores integrating Bucket4j with Embedded Infinispan to implement a distributed rate limiter with logic execution directly on the data-owning node.

By 
Arkadii Osheev user avatar
Arkadii Osheev
·
May. 01, 26 · Analysis
Likes (2)
Comment
Save
Tweet
Share
1.9K Views

Join the DZone community and get the full member experience.

Join For Free

In distributed systems, the biggest challenge for rate limiting is state. How do you ensure that two parallel requests hitting different cluster nodes don't "double-spend" the same token?

In this article, we dive into the implementation details of the integration between the Bucket4j rate-limiting framework and Embedded Infinispan (not HotRod). This setup creates a data grid across different pods of a single application, allowing for seamless, distributed token management.

Note: This guide is based on Bucket4j 8.16.1, Infinispan 16.1.0, and infinispan-protostream 6.0.4. While the logic should hold for earlier versions, behavior in Infinispan < 10 may require additional verification.
To keep this guide focused and readable, I have omitted some of the more granular implementation details.

Main Actors

The Embedded Infinispan Layer: Functional Map API 

Infinispan is a high-performance key-value store designed for low latency. For Bucket4j, the most critical feature is the Functional Map API.

Java
 
    @Experimental
   interface ReadWriteMap<K, V> extends FunctionalMap<K, V> { 
   ...
   }


Unlike a standard cache.put(), the Functional Map allows us to execute a lambda (an Entry Processor) directly on the node that owns the data with a CAS guarantee. This approach offers three major advantages via ReadWriteMap.eval(key, entryProcessor):

  • Atomicity: Locks are acquired before the lambda executes.
  • Data locality: The function travels to the data, minimizing network traffic.
  • Non-blocking: It returns a CompletableFuture, fitting perfectly into modern asynchronous architectures.

The detailed jump into the implementation of Infinispan cache I think is very verbose and we can skip them for clarity of the flow.

The Bucket4j Layer: Abstraction and Proxies

Bucket

A Bucket is a stateful object that maintains a current balance of tokens and a set of rules (bandwidths) for how those tokens are consumed and refilled over time. It uses a Builder pattern to configure not just the bucket’s behavior, but also its execution model (how and where the logic is processed, we look at that later more precisely). It has a sibling, AsyncBucketProxy, which provides methods returning CompletableFuture objects. For the remainder of this article, when I refer to a 'bucket,' I am specifically referring to the AsyncBucketProxy.

For simplicity, we could pretend that it has the next method.

Java
 
public interface AsyncBucketProxy {
   
    CompletableFuture<Boolean> tryConsume(long numTokens);
  
}


Of course, you could use the abstract implementation and initialize the object directly through a constructor. However, you have a better option — use the built-in builders, and your flow would look like this 

Java
 
Bucket.builder()
       .addLimit(limit -> limit.capacity(50).refillGreedy(10, Duration.ofSeconds(1)))
       .build();

// or
BucketConfiguration configuration = BucketConfiguration.builder()
	.addLimit(limit -> limit.capacity(50).refillGreedy(10, Duration.ofSeconds(1)))
    .build();

proxyManager
           .builder()
           .build("SYSTEM", () -> CompletableFuture.completedFuture(configuration));


Pretty neat, right? While Bucket.builder() is responsible for building local buckets, the ProxyManager handles distributed buckets, and that is where things get interesting.

Proxy Manager

The ProxyManager interface (and its base implementation AbstractProxyManager) is the backbone of Bucket4j's distributed logic. It unifies the flow of building bucket behavior and delegates the execution to the specific implementations. To make this, Bucket4j internally uses Remote command and Request interfaces.

Java
 
public interface RemoteCommand<T> {

    CommandResult<T> execute(MutableBucketEntry mutableEntry, long currentTimeNanos);
}
Java
 
public class Request<T> implements ComparableByContent<Request<T>> {

    //...... omit for clarity
    private final RemoteCommand<T> command;

    public Request(RemoteCommand<T> command, 
                   //...... omit for clarity) {
        this.command = command;
    }
  //...... omit for clarity
} 


With a "Remote command" interface, we could wrap and execute any operation on data on the remote server. But we don't have an actor that executes this command: CommandExecutor (AsyncCommandExecutor for AsyncBucket).

AbstractProxyManager creates this object inside and enriches the bucket with the implementation. Look at the example below.

Java
 
        @Override
        public AsyncBucketProxy build(K key, Supplier<CompletableFuture<BucketConfiguration>> configurationSupplier) {
            if (configurationSupplier == null) {
                throw BucketExceptions.nullConfigurationSupplier();
            }

            AsyncCommandExecutor commandExecutor = new AsyncCommandExecutor() {
                @Override
                public <T> CompletableFuture<CommandResult<T>> executeAsync(RemoteCommand<T> command) {
                    ExpirationAfterWriteStrategy expirationStrategy = clientSideConfig.getExpirationAfterWriteStrategy().orElse(null);
                    Request<T> request = new Request<>(command, getBackwardCompatibilityVersion(), getClientSideTime(), expirationStrategy);
                  	// Pay attention!
                    Supplier<CompletableFuture<CommandResult<T>>> futureSupplier = () -> AbstractProxyManager.this.executeAsync(key, request);
                    return clientSideConfig.getExecutionStrategy().executeAsync(futureSupplier);
                }
            };
            commandExecutor = asyncRequestOptimizer.apply(commandExecutor);

            return new DefaultAsyncBucketProxy(commandExecutor, recoveryStrategy, configurationSupplier, implicitConfigurationReplacement, listener);
        }

The Secret Sauce: By delegating executeAsync to the ProxyManager, Bucket4j separates the rate-limiting logic from the underlying storage technology. This is why the same library can support Redis, Postgres, or Infinispan just by switching the manager.
Code: Supplier<CompletableFuture<CommandResult<T>>> futureSupplier = () -> AbstractProxyManager.this.executeAsync(key, request);

With this knowledge, we could jump into the InfinispanProxyManager to look at the details. 

Infinispan Proxy Manager

The first thing to note in the documentation is that Bucket4j requires specific serialization for Infinispan. This is crucial because Infinispan operates on byte streams. (below, part of documentation).

Java
 
import io.github.bucket4j.grid.infinispan.serialization.Bucket4jProtobufContextInitializer;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
...
GlobalConfigurationBuilder builder = new GlobalConfigurationBuilder();
builder.serialization().addContextInitializer(new Bucket4jProtobufContextInitializer());


However, our focus should be on the implementation of execution. Let's have a look at this.

Java
 
    @Override
    public <T> CompletableFuture<CommandResult<T>> executeAsync(K key, Request<T> request) {
        try {
            InfinispanProcessor<K, T> entryProcessor = new InfinispanProcessor<>(request);
            CompletableFuture<byte[]> resultFuture = readWriteMap.eval(key, entryProcessor);
            return resultFuture.thenApply(resultBytes -> deserializeResult(resultBytes, request.getBackwardCompatibilityVersion()));
        } catch (Throwable t) {
            CompletableFuture<CommandResult<T>> fail = new CompletableFuture<>();
            fail.completeExceptionally(t);
            return fail;
        }
    }

    @Override
    public <T> CommandResult<T> execute(K key, Request<T> request) {
        // sync copy of executeAsync
    }


And here we see a special InfinispanProcessor<K, T>. What secrets could we find inside? 

Java
 
import io.github.bucket4j.distributed.remote.AbstractBinaryTransaction;
import io.github.bucket4j.distributed.remote.RemoteBucketState;
import io.github.bucket4j.distributed.remote.Request;
import io.github.bucket4j.distributed.serialization.InternalSerializationHelper;
import io.github.bucket4j.util.ComparableByContent;
import org.infinispan.functional.EntryView;
import org.infinispan.functional.MetaParam;
import org.infinispan.util.function.SerializableFunction;

public class InfinispanProcessor<K, R> implements
        SerializableFunction<EntryView.ReadWriteEntryView<K, byte[]>, byte[]>,
        ComparableByContent<InfinispanProcessor> {
    public InfinispanProcessor(Request<R> request) {
        this.requestBytes = InternalSerializationHelper.serializeRequest(request);
    }

    //... omitted for clarity 
    public byte[] apply(EntryView.ReadWriteEntryView<K, byte[]> entry) {
        if (requestBytes.length == 0) {
            // it is the marker to remove bucket state
            if (entry.find().isPresent()) {
                entry.remove();
                return new byte[0];
            }
        }

        return new AbstractBinaryTransaction(requestBytes) {
	// ... omitted for clarity 

            @Override
            protected void setRawState(byte[] newStateBytes, RemoteBucketState newState) {
                ExpirationAfterWriteStrategy expirationStrategy = getExpirationStrategy();
                long ttlMillis = expirationStrategy == null ? -1 : expirationStrategy.calculateTimeToLiveMillis(newState, getCurrentTimeNanos());
                if (ttlMillis > 0) {
                    entry.set(newStateBytes, new MetaParam.MetaLifespan(ttlMillis));
                } else {
                    entry.set(newStateBytes);
                }
            }
        }.execute();
    }
 }


And here is the trick to integrate Infinispan and Bucket4j: SerializableFunction<EntryView.ReadWriteEntryView<K, byte[]>, byte[]>. This is a special interface that allows the system to ship a function to a different node to execute arbitrary code. Infinispan accepts this serializable function and executes it on the remote node where the data actually resides.

Crucial requirement: The serialized bytecode must be present on both the sender and the receiver nodes. If your pods are running different versions of the application, the InfinispanProcessor will fail to deserialize on the owner node.

So, there is still uncertainty about what is inside AbstractBinaryTransaction.execute(). Let's dive into the code.

Java
 
    public byte[] execute() {
			// ... logic to deserialize request ...
        try {
            RemoteBucketState currentState = null;
            if (exists()) {
                byte[] stateBytes = getRawState(); // get state on the node 
                currentState = deserializeState(stateBytes);
            }
            MutableBucketEntry entryWrapper = new MutableBucketEntry(currentState);
            currentTimeNanos = request.getClientSideTime() != null ?
             request.getClientSideTime(): System.currentTimeMillis() * 1_000_000;
            RemoteCommand<?> command = request.getCommand();
            CommandResult<?> result = command.execute(entryWrapper, currentTimeNanos);

            if (entryWrapper.isStateModified()) {
                RemoteBucketState newState = entryWrapper.get();
                setRawState(serializeState(newState, backwardCompatibilityVersion), newState);
            }

            return serializeResult(result, request.getBackwardCompatibilityVersion());
        } 
        // omit for clarity
    }


In this part of the code, we see execution logic that is involved on the remote server and compute - does the request have enough tokens to come further, or should it be repeated? 

Execution Flow

Previously, the main parts of algorithms were introduced, and we are ready to combine them to get the final view of how we get the final answer to the question: could we consume tokens or not? 

As discussed, a lot of magic hides in the building phase by wrapping CommandExecutor and Requests on the ProxyManager level, and let's unwrap this envelope and show what's happening in integration. 

*Key - Infinispan uses the Consistent Hashing technique inside.


*Key - Infinispan uses the Consistent Hashing technique inside. 

Summary

Integrating Bucket4j with Embedded Infinispan offers a sophisticated solution for distributed rate limiting by moving logic to the data.

  • Data locality: By using readWriteMap.eval(), the rate-limiting decision is executed directly on the node that owns the bucket's state, minimizing network hops.
  • Atomic consistency: Infinispan ensures that the InfinispanProcessor runs with strict atomicity and CAS guarantees, solving the "double-spend" problem without heavy distributed locks.
  • Performance: Operating at the byte[] level ensures that state transitions are extremely fast and the memory footprint remains small.

For production, ensure cluster homogeneity: Your Bucket4j versions and application bytecode must be identical across all pods to avoid serialization errors. This setup allows you to build a self-contained, high-performance rate-limiting grid that scales horizontally with your application.

Infinispan rate limit Integration

Opinions expressed by DZone contributors are their own.

Related

  • Building a Reusable Framework to Standardize API Ingestion in an On-Prem Lakehouse
  • The "Zombie API" Attack: Why Your Old Integrations Are Your Biggest Security Risk
  • AI Agents Expose a Design Gap in Microservices Resilience Architecture
  • Navigating the Complexities of AI-Driven Integration in Multi-Cloud Environments: A Veteran’s Insights

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook