The Evolution of the Producer-Consumer Problem in Java
Want to learn more about the evolution of the producer-consumer problem in Java? Check out this post where we look at previous and new ways to handle this problem.
Join the DZone community and get the full member experience.
Join For Free
The producer-consumer problem is a classic example of a multi-process synchronization problem. For most of us, this problem is maybe the first synchronization problem that we studied back in school and were facing parallel algorithms for the first time. Simple as it is, it resumes the biggest challenge in parallel computing — the sharing of a single resource by multiple processes.
Problem Statement
There are two processes, a producer and a consumer, that share a common buffer with a limited size. The producer “produces” data and stores it in the buffer, and the consumer “consumes” the data, removing it from the buffer. Having two processes that run in parallel, we need to make sure that the producer will not put new data in the buffer when the buffer is full and the consumer won’t try to remove data from the buffer if the buffer is empty.
Solution
For solving this concurrency problem, the producer and the consumer will have to communicate with each other. If the buffer is full, the producer will go to sleep and will wait to be notified. After the consumer will remove some data from the buffer, it will notify the producer, and then, the producer will start refilling the buffer again. The same process will happen if the buffer is empty, but in this case, the consumer will wait to be notified by the producer.
If this communication is not done properly, it can lead to a deadlock where both processes will wait for each other.
Classic Approach
Let’s see a typical Java solution to this problem.
package ProducerConsumer;
import java.util.LinkedList;
import java.util.Queue;
public class ClassicProducerConsumerExample {
public static void main(String[] args) throws InterruptedException {
Buffer buffer = new Buffer(2);
Thread producerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
buffer.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
buffer.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
static class Buffer {
private Queue<Integer> list;
private int size;
public Buffer(int size) {
this.list = new LinkedList<>();
this.size = size;
}
public void produce() throws InterruptedException {
int value = 0;
while (true) {
synchronized (this) {
while (list.size() >= size) {
// wait for the consumer
wait();
}
list.add(value);
System.out.println("Produced " + value);
value++;
// notify the consumer
notify();
Thread.sleep(1000);
}
}
}
public void consume() throws InterruptedException {
while (true) {
synchronized (this) {
while (list.size() == 0) {
// wait for the producer
wait();
}
int value = list.poll();
System.out.println("Consume " + value);
// notify the producer
notify();
Thread.sleep(1000);
}
}
}
}
}
Here we have two threads, a producer and a consumer thread, which share a common buffer. The producer thread starts producing new elements and stores them in the buffer. If the buffer is full, it goes to sleep and will wait to be notified. Otherwise, it will put a new element in the buffer and notify the consumer. Like I said before, the same process applies to the consumer. If the buffer is empty, the consumer will wait to be notified by the producer. Otherwise, it will remove an element from the buffer
and it will notify the consumer.
As you can see, in the previous example, both jobs are managed by the buffer
object. The threads are just calling buffer.produce()
and buffer.consume()
, and everything is done by these two methods.
This is a debatable subject, but in my opinion, the buffer shouldn’t be responsible for creating or removing the elements. Of course, that depends on what you want to achieve, but in this case, the buffer should be responsible just for storing and pooling elements in a thread-safe manner, not for producing the elements.
So, let’s move the produce and consume logic out of the buffer
object.
package ProducerConsumer;
import java.util.LinkedList;
import java.util.Queue;
public class ProducerConsumerExample2 {
public static void main(String[] args) throws InterruptedException {
Buffer buffer = new Buffer(2);
Thread producerThread = new Thread(() -> {
try {
int value = 0;
while (true) {
buffer.add(value);
System.out.println("Produced " + value);
value ++;
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
while (true) {
int value = buffer.poll();
System.out.println("Consume " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
static class Buffer {
private Queue<Integer> list;
private int size;
public Buffer(int size) {
this.list = new LinkedList<>();
this.size = size;
}
public void add(int value) throws InterruptedException {
synchronized (this) {
while (list.size() >= size) {
wait();
}
list.add(value);
notify();
}
}
public int poll() throws InterruptedException {
synchronized (this) {
while (list.size() == 0) {
wait();
}
int value = list.poll();
notify();
return value;
}
}
}
}
That’s better. Now, the buffer is responsible for storing and removing the elements in a thread-safe manner.
Blocking Queue
However, we can further improve this. In the previous example, we’ve created a buffer that, when storing an element, waits for a slot to become available in case there is no more space, and, on pooling, in case that the buffer is empty, it waits for an element to become available, making the storing and removing operations thread-safe.
But, Java already has a collection for this. It’s called a BlockingQueue
and, as it is described here, this is a queue that is thread-safe to put into and take instances from. It does exactly what we want. So, if we use a BlockingQueue
in our example, we don’t have to implement the waiting and notifying mechanism.
Let’s see how it looks.
package ProducerConsumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class ProducerConsumerWithBlockingQueue {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2);
Thread producerThread = new Thread(() -> {
try {
int value = 0;
while (true) {
blockingQueue.put(value);
System.out.println("Produced " + value);
value++;
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
while (true) {
int value = blockingQueue.take();
System.out.println("Consume " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
}
The runnables look exactly as they did before. They produce and consume elements in the same way. The only difference is that here we use a blockingQueue
instead of our buffer
object.
Some Details About the Blocking Queue
There are two types of BlockingQueue:
- Unbounded queue
- Bounded queue
An unbounded queue can grow almost indefinitely, and the add operations are not blocking. You can create an unbounded queue like this:
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();
In this case, since the add operations are not blocking, the producer doesn’t have to wait when adding new elements. Every time when the producer wants to add a new element, the queue will store it. But, there is catch here. If the consumer doesn’t remove elements faster than the producer is adding new elements, then the memory will fill up and we will get an OutOfMemory
exception.
The bounded queue, instead, has a fixed size. You can create one like this:
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);
The main difference is that using a bounded queue, if the queue is full and the producer tries to store another element, depending on what method is used for adding, the queue will block until it will be enough space.
There are four methods for adding elements in a blocking queue:
-
add()
– returns true if the insertion was successful, otherwise, it will throw anIllegalStateException
-
put()
– inserts an element into the queue and waits for a free slot if necessary -
offer()
– returns true if the insertion was successful, otherwise, it returns false - offer(E e, long timeout, TimeUnit unit) – inserts an element into the queue if it is not full, or waits for an available slot within a specified timeout
So, if you use the put()
method and the queue is full, the producer will have to wait until there is a free slot. That’s what we used in the previous example, and this will work in the same way as ProducerConsumerExample2.
Using a Thread Pool
What else can we improve here? Let’s analyze what we did. We’ve instantiated two threads, one that puts some elements in the blocking queue, the producer, and another that retrieves elements from the queue, the consumer.
But, good software techniques suggest that creating and destroying threads manually is bad practice. Thread creation is an expensive task. Each thread creation implies the following steps:
- It allocates memory for a thread stack
- The OS creates a native thread corresponding to the Java thread
- Descriptors relating to the thread are added to the JVM internal data structures
Don’t get me wrong. There is nothing wrong with having more threads. That’s how parallelism works. The problem here is that we’ve created them “manually." That’s the bad practice. If we create them manually, besides the creation’s cost, another problem is that we don’t have control over how many of them are running at the same time. For example, if millions of requests are reaching a server app, and for each request, a new thread is created, then millions of threads will run in parallel and this could lead to a thread starvation.
So, we need a way to strategically manage threads. And here comes the thread pools.
Thread pools handle the threads' life cycle based on a selected strategy. It holds a limited number of idle threads and reuses them when it needs to solve tasks. This way, we don’t have to create a new thread every time for a new request, and therefore, we can avoid reaching a thread starvation,
The Java thread pool implementation consists of:
- A task queue
- A collection of worker threads
- A thread factory
- Metadata for managing thread pool state.
For running some tasks concurrently, you have to put them in the task queue. Then, when a thread is available, it will receive a task and run it. The more available threads, the more tasks that are executed in parallel.
Beside the thread lifecycle management, another advantage when working with a thread pool is that when you plan on how to split the work to be executed concurrently, you can think in a more functional way. The unit of parallelism is not the thread anymore; it’s the task. You design some tasks that are executed concurrently, and not some threads that share a common memory and are running in parallel. Thinking in a functional way can help us avoid some common multithreading problems, like deadlocks or data races. Nothing can stop us from reaching into these problems again, but, because using the functional paradigm, we don’t imperatively synchronize the concurrent computation (with locks). This is far less happening than working directly with threads and shared memory. This is not the case in our example since the tasks share a blocking queue, but I just wanted to highlight this advantage.
Here and here you can find more details about thread pools.
With all of this being said, let’s see how our example looks using a thread pool.
package ProducerConsumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
public class ProducerConsumerExecutorService {
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2);
ExecutorService executor = Executors.newFixedThreadPool(2);
Runnable producerTask = () -> {
try {
int value = 0;
while (true) {
blockingQueue.put(value);
System.out.println("Produced " + value);
value++;
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable consumerTask = () -> {
try {
while (true) {
int value = blockingQueue.take();
System.out.println("Consume " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
executor.execute(producerTask);
executor.execute(consumerTask);
executor.shutdown();
}
}
The difference here is that, instead of manually creating and running the consumer and producer threads, we build a thread pool, and it will receive two tasks, a producer and a consumer task. The producer and consumer tasks are actually the same runnables that were used in the previous example. Now, the executor (the thread pool implementation) receives the tasks, and its working threads will execute them.
In our simple case, everything will work the same as before. Just like in previous examples, we still have two threads, and they still produce and consume elements in the same way. So, we don’t have a performance improvement here, but the code looks cleaner. We no longer build the threads manually, but, instead, we just specify what we want. And, we want a way to execute some tasks in parallel.
So, when you use a thread pool, you don’t have to think to threads as the unit of parallelism, but instead, you think to some tasks that are executed concurrently. That’s what you need to know, and the executor will handle the rest. It will receive some tasks, and then, it will execute them using the available working threads.
Summary
First, we saw the “traditional” solution of a consumer-producer problem. We try to not reinvent the wheel when is not necessary, but instead, we want to reuse already tested solutions. So, instead of writing down a wait/notify system, why not use the Java blocking queue that already offers that? And also, we can get rid of manually creating threads when Java provides us with a thread pool that manages thread lifecycle very efficiently already. With these improvements, the solutions of the consumer-producer problem look more reliable and understandable.
Opinions expressed by DZone contributors are their own.
Comments