The Producer and Consumer Design Patterns
The producer/consumer design pattern is a pre-designed solution to separate the two main components by placing a queue in the middle.
Join the DZone community and get the full member experience.
Join For FreeMy previous post brought some questions about the Producer / Consumer pattern even when it was not the idea of the article. I thought that it was a good idea for my next article to talk about my understanding on this design pattern.
The producer / consumer design pattern is a pre-designed solution to separate the two main components by placing a queue in the middle, letting the producers and the consumers execute in different threads.
This way the production of tasks to be consumed is absolutely independent from its consumption. This asynchronism breaks the dependency between the producer and the consumer (execution of the task). It is actually a pretty simple design that can add a lot of performance improvement in certain circumstances.
To put this in context let’s think about a scenario where we could use it. The first thing that comes to my mind is a credit card application process. In this scenario the producers of credit card applications could be someone at home applying online or someone at a Bank branch or a call center working with the prospect. Lets say we have an automated process that can approve or denied applications. Lets assume that all these applications are placed into the database waiting for this process to pick them up and put them in the queue to be processed.
Where the producer and the consumer matches with this idea? The producer(s) can be a piece of code that weaks up regularly (could be even every 100 milliseconds), get applications from the database, creates the appropriate task and put them into the queue to be worked by the consumers.
The consumer would be the process that has the knowledge to make the decision on whether to approve or decline the credit card application.
Because we have this dissociation between the task being created and consumed, the producer is only responsible for putting tasks into the queue and the consumers are only responsible for going to the queue to pick up tasks and do the work. The consumers never stop and as soon as they finish with one task they pick up the next application to work with,
Let's Start Coding
We have two major entities that are the Producer and the Consumer. I’ll start coding into the Producer. Both, the producer and the consumer will be classes being able to run independently from the main java thread. In order to do so, this classes will need to either extend Thread or implement Runnable. To make this decision we need to ask ourselves if this classes will eventually need to extend from other classes. If that would be the case, then you better make them implement Runnable, as you may know in Java you can only extend from one class, but you can implement multiple. Since my example is very simple and is not intended to become something more complex I’ll extend from Thread in both cases.
package com.test.multithread;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
public class TaskProducer extends Thread {
private boolean blnExit = false;
private final List<TaskConsumer> consumers;
private final BlockingQueue<Long> sharedQueue;
public TaskProducer(final BlockingQueue<Long> sharedQueue,
final List<TaskConsumer> consumers) {
this.sharedQueue = sharedQueue;
this.consumers = consumers;
}
@Override
public void run() {
long i = 0;
////////////////////////////////////////////
// PRODUCING THE OBJECTS TO BE CONSUMED
////////////////////////////////////////////
while (!blnExit) {
try {
i++;
sharedQueue.put(Long.valueOf(i));
} catch (final InterruptedException ex) {
ex.printStackTrace();
}
}
/////////////////////////////////
// WAIT UNTIL THE QUEUE IS EMPTY
/////////////////////////////////
while (sharedQueue.size() > 0) {
try {
Thread.sleep(200);
System.out.println("Producer waiting to end.");
} catch (final InterruptedException e) {
break;
}
}
////////////////////////////////////////////
// SEND TO ALL CONSUMERS THE EXIT CONDITION
////////////////////////////////////////////
for (final TaskConsumer consumer : consumers) {
consumer.setExitCondition(true);
}
}
}
As you can see it is a very simple code that creates tasks that consist on a sequential number. That number it’s being put into the queue to be processed later.
There is actually something that needs to be specially mentioned here and that is the use of a BlockingQueue. The reason why we use this type of Queue is because the implementations of BlockingQueue are thread-safe and this is an essential feature in a Producer / Consumer scenario. This way putting and removing elements from the Queue is executed atomically.
Now we’ll see the Consumer code:
package com.test.multithread;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class TaskConsumer extends Thread {
boolean blnExit = false;
private final int id;
private final BlockingQueue<Long> sharedQueue;
public TaskConsumer(final int id, final BlockingQueue<Long> sharedQueue) {
this.id = id;
this.sharedQueue = sharedQueue;
}
public void setExitCondition(final boolean blnDoExit) {
blnExit = blnDoExit;
}
@Override
public void run() {
final Random generator = new Random();
while (!blnExit) {
try {
if (sharedQueue.size() > 0) {
System.out.println("Consumer id:" + id +
" sent email " + sharedQueue.take());
// TO BE REMOVED (ONLY SIMULATES RANDOM WORKING TIME)
final long start = System.currentTimeMillis();
Thread.sleep(generator.nextInt(1000) + 1000);
final long end = System.currentTimeMillis();
} else
Thread.sleep(500);
} catch (final InterruptedException ex) {
ex.printStackTrace();
}
}
System.out.println("Consumer " + id + " exiting");
}
}
Also very simple right?
The consumer will take applications from the queue and simulate the processing (through a randomized working time) to make this more real. Whenever it finishes with the application it take the next one and so on so forth. In order to make this possible the consumer receives an instance of the queue when it’s created and a certain id that identifies it.
Now that we have both parts done, lets put everything together writing the code that actually create instances of these two classes and kick off the process.
package com.test.multithread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerConsumerPattern {
private final int queueCapacity = 200;
private int numberOfThreads = 10;
public static void main(final String args[]) {
new ProducerConsumerPattern(20);
}
public ProducerConsumerPattern(final int numberOfThreads) {
if (numberOfThreads <= 0 || numberOfThreads > 100)
throw new IllegalArgumentException("The number of threads should be a number between 1 and 100");
this.numberOfThreads = numberOfThreads;
//Creating shared object
final BlockingQueue<Long> sharedQueue = new LinkedBlockingQueue<Long>(queueCapacity);
// Creating and starting the Consumer Threads
final List<TaskConsumer> consumers = new ArrayList<TaskConsumer>();
for (int i = 0; i <= this.numberOfThreads; i++) {
final TaskConsumer consThread = new TaskConsumer(i, sharedQueue);
consThread.start();
consumers.add(consThread);
}
// Creating and starting the Producer Thread
final TaskProducer prodThread = new TaskProducer(sharedQueue, consumers);
prodThread.start();
}
}
Once again the code is very simple and easy to understand. The main class creates the BlockingQueue and creates one Producer and 20 Consumer objects, passing the queue to each of them to add and remove objects to the queue.
Three small and simple classes and you are ready to run a Producer / Consumer application. The only thing you need to do now is add some real logic to this code and put it to work. It is a really powerful design that let you run potentially thousands of task in the background with minimal complexity and high reliability.
Next article will be about a fairly simple way of adding monitoring capabilities to background processes to simplify the way we control its performance and potentially manipulate the way the process behave.
Don't miss it.
Opinions expressed by DZone contributors are their own.
Comments