Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Get Your Internal Queues Under Control in Java

DZone 's Guide to

Get Your Internal Queues Under Control in Java

There are a lot of reasons why you should be using internal queues in your program.

· Java Zone ·
Free Resource

There are many good reasons why you should use internal queues in your program. Most common patterns contain the same principle — divide your processing into two separate parts, and then, every part can work autonomously. The queue is the best way to transfer an object from one thread to another and ensure proper visibility of all fields belonging to a particular transferred object. This common pattern is called the Consumer-Producer pattern. If you want to read more about that pattern, please visit my previous post Boost of Parallelism Using Producer-Consumer Pattern.

However, today, I would like to focus more on potential troubles, monitoring, and how to avoid losing messages in your program.

What Could Go Wrong?!

Let's assume you're working on a microservice application running on a cloud environment in 5 instances that usually deploy a new version in a couple of days. Your application has a REST endpoint that enqueues a new task to an internal queue for processing and immediately returns OK. It then asynchronously processes the task on a different thread.

One day, you have to release a new version of your microservices. It's pretty easy; if you are running on the cloud, just push the button and everything will be deployed as one instance after another without any downtime.

At the time of clicking on the button, you probably made a terrible mistake. You lost all your enqueued tasks and you can expect a lot of complaints from your clients (you might haven't sent any money ¯\_(ツ)_/¯ ).

There are actually two ways to fix this situation:

  • You remove an internal queue and implement external queue (e.g. RabbitMQ) and don't acknowledge the task as processed until the very end of the processing. If you cut it off somewhere in the middle of processing by your deployment, then the task can be reprocessed when a new version of your application is up and running.
  • You can disconnect all callers from your application to stop filling up your internal queue and wait until all your tasks have been processed and then trigger the deploy.

However, how can I see that all tasks have been already processed? How many tasks did I have in my internal queue? Tens, hundreds, or thousands? You probably don't know; it's very hard to guess the processing time ration between your publishers and consumers of your queue.

In general, bounded queues very often tend to be either full or absolutely empty, depending on whether the processing time ratio between your publisher and consumer is stable or volatile in time. It's absolutely OK if your queue becomes relatively occupied by tasks in a peak (let's say between 8-11 PM) and you have enough time to process them over the night — if you are willing to sacrifice latency of a single task, of course.

Even worse, you have an unbounded queue to keep your unprocessed tasks, and then, if your publishers are even slightly faster than your consumers, you can end up with a very big queue over the time you run your application.

This is the situation when you run your own code and you can decide what kind of queue you want to use. You can even encounter a situation when the internal queue is handled by any framework that you are using in your application. However, let's focus on the situation that everything is in your hands and you have a chance to make some changes in an internal queue that is used at the end.

How to Set Up the Proper Insights

Let's agree that we need more information about our internal queues and that we cannot just assume that our queue is supposed to be empty at the moment we are pushing the new version of our application to production. Unfortunately, there isn't any way to expose the information for queues belonging to JDK. Let's dig into it a bit and try to expose something on our own.

Start With the Basics

As a first step, we can expose some basic information, which is available in the Queue interface of the JDK.

public interface QueueMonitor {

    ThreadPoolExecutor executor();

    /**
     * Returns {@code true} if there is any thread executing any task.
     *
     * @return {@code true} if there is any active task.
     */
    default boolean isRunning() {
        return executor().getActiveCount() > 0;
    }

    /**
     * Returns the approximate number of threads that are actively
     * executing tasks.
     *
     * @return the number of threads
     */
    default int activeCount() {
        return executor().getActiveCount();
    }

    /**
     * Returns the approximate total number of tasks that have
     * completed execution. Because the states of tasks and threads
     * may change dynamically during computation, the returned value
     * is only an approximation, but one that does not ever decrease
     * across successive calls.
     *
     * @return the number of tasks
     */
    default long completedTasksTotal() {
        return executor().getCompletedTaskCount();
    }

    /**
     * Returns the approximate total number of tasks that have ever been
     * scheduled for execution. Because the states of tasks and
     * threads may change dynamically during computation, the returned
     * value is only an approximation.
     *
     * @return the number of tasks
     */
    default long enqueuedTasksTotal() {
        return executor().getTaskCount();
    }

    /**
     * Returns the approximate number of tasks that are current enqueued
     * and waiting to be scheduled for execution.
     *
     * @return number of enqueued tasks.
     */
    default long enqueuedTasksCurrent() {
        return executor().getQueue().size();
    }

    /**
     * Returns the {@link Stream stream} of currently enqueued tasks
     * in an internal queue.
     *
     * @return number of enqueued tasks.
     */
    default Stream<Runnable> enqueuedTasks() {
        return executor().getQueue().stream();
    }
}


If you use this interface for your component keeping any ThreadPoolExecutor and provide it using the executor method, then you automatically get some basic information about your queue, which can be further exposed using your custom REST monitor API or via JMX. This all depends on whether your service is an internal service, which is not exposed to the outside world, or you already have HTTP access to your application. If not, then JMX is probably a better way how to do it, depending on the circumstances and nature of your application.

That's Not Enough. I Want to Know More.

Let's dig a bit deeper to find more information. Currently, we are able to list all our enqueued tasks (not being processed) and see some numbers describing how and how many tasks flew through our queue. However, we are missing the information about currently executed tasks. The exact object on which we can call some method to get some useful information.

/**
 * A custom trackable thread pool which can keep and provide a currently running
 * task and is able to execute {@link TrackableRunnable} which keeps useful
 * information about the current execution.
 * <p>
 * This implementation follows configuration representing
 * {@link Executors#newSingleThreadExecutor()}, the tracking will stop working
 * with multiple workers, some additional changes needed to be done
 * to support multiple workers.
 */
public class TrackableSingleThreadPoolExecutor extends ThreadPoolExecutor {

    /*
     * Task must be held as a volatile variable even in SingleThreadedExecutor.
     * - A thread is destroyed and new one is recreated when an exception is thrown and caught.
     */
    private volatile TrackableRunnable activeTask;

    private TrackableSingleThreadPoolExecutor(ThreadFactory threadFactory) {
        super(1, 1, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(), threadFactory);
    }

    @Override
    protected void beforeExecute(Thread thread, Runnable runnable) {
        if (!(runnable instanceof TrackableRunnable)) {
            throw new IllegalArgumentException("Executed task must be an instance of "
                    + TrackableRunnable.class.getSimpleName());
        }

        this.activeTask = (TrackableRunnable) runnable;
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable thread) {
        this.activeTask = null;
    }

    public TrackableRunnable getActiveTask() {
        return activeTask;
    }

    /**
     * Keeps a context with an executed runnable. We can track information
     * about currently executed task.
     */
    public static class TrackableRunnable implements Runnable {

        private final Contextual context;

        public TrackableRunnable(Contextual context) {
            this.context = context;
        }

        @Override
        public void run() {
            // Some interesting computation.
        }

        public Contextual getContext() {
            return context;
        }
    }
}


As I mentioned in JavaDoc, this implementation supports only one worker. I suppose it's not a difficult task to change implementation to be able to return a list of active tasks keeping some contextual information.

How Can I Present the Information?

You can use two simple ways to publish it:

JMX (Java Management Extensions)

  • You just need to implement your MBean and expose what you want to observe
  • Start MBean Server to be able to connect to it via, e.g. JVisualVM or other tools

REST Monitor API

  • Use only if you are running an internal application, or it could be useful to protect your endpoint somehow:
[
  {
    "executor": "food-preparation",
    "active": "spaghetti",
    "enqueued-tasks-current": 0,
    "enqueued-tasks-total": 6,
    "completed-tasks-total": 6,
    "enqueued-tasks": [
      "pizza",
      "caesar salad",
      "cheerios"
    ]
  },
  {
    "executor": "drink-preparation",
    "active": "cuba libre",
    "enqueued-tasks-current": 0,
    "enqueued-tasks-total": 6,
    "completed-tasks-total": 6,
    "enqueued-tasks": [
      "mojito",
      "beer"
    ]
  }
]


How About a Graceful Shutdown of Our Executor?

It's another way that can help drain your queue before your application is restarted in your cloud environment. In general, Kubernetes is able to wait for termination JVM and execute shutdown hooks (http://randomizd.blogspot.com/2017/09/gracefully-shutting-down-java-in.html).

The only thing you need to configure is  ThreadPoolExecutor#shutdown() to be called inside a shutdown hook

Runtime.getRuntime().addShutdownHook(new Thread(executor::shutdown));


However, you can encounter several problems:

  • The termination can be delayed for a longer period of time, especially when your unbounded queue is full of tasks.
  • You need to ensure that you no longer accept any tasks because all of them will be rejected by the executor, and you should specify the behavior of the executor when this happens using a proper implementation of RejectedExecutionHandler.
  • It's good practice to protect tasks (especially the important tasks) one more time. What I mean is to implement the mechanism that the rejected message is not acknowledged and return it to the, let's say, an external queue where the message can wait for a new healthy instance and be processed subsequently. The problem can arise when our application is called via REST API and invocation is automatically rejected and a transaction/task lost.

Thank you for reading my article and please leave comments below. If you would like to be notified about new posts, then start following me on Twitter.

Topics:
java ,queue ,parallelism ,internal queue ,http ,access ,jmx ,ReST ,rest api ,monitor

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}