DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • How to Use JMS ActiveMQ With Mule 4: Part 1
  • Mastering AI Agents: How Agentic Design Patterns Make Agents Smarter
  • Mastering Concurrency: An In-Depth Guide to Java's ExecutorService
  • Understanding the Integration of Embedded Systems in Consumer Electronics

Trending

  • Integrating Model Context Protocol (MCP) With Microsoft Copilot Studio AI Agents
  • Metrics at a Glance for Production Clusters
  • Manual Sharding in PostgreSQL: A Step-by-Step Implementation Guide
  • Endpoint Security Controls: Designing a Secure Endpoint Architecture, Part 2

The Producer and Consumer Design Patterns

The producer/consumer design pattern is a pre-designed solution to separate the two main components by placing a queue in the middle.

By 
Andres Navarro user avatar
Andres Navarro
·
Jun. 03, 14 · Tutorial
Likes (14)
Comment
Save
Tweet
Share
43.9K Views

Join the DZone community and get the full member experience.

Join For Free

My previous post brought some questions about the Producer / Consumer pattern even when it was not the idea of the article. I thought that it was a good idea for my next article to talk about my understanding on this design pattern.

The producer / consumer design pattern is a pre-designed solution to separate the two main components by placing a queue in the middle, letting the producers and the consumers execute in different threads.

This way the production of tasks to be consumed is absolutely independent from its consumption. This asynchronism breaks the dependency between the producer and the consumer (execution of the task). It is actually a pretty simple design that can add a lot of performance improvement in certain circumstances.

To put this in context let’s think about a scenario where we could use it. The first thing that comes to my mind is a credit card application process. In this scenario the producers of credit card applications could be someone at home applying online or someone at a Bank branch or a call center working with the prospect. Lets say we have an automated process that can approve or denied applications. Lets assume that all these applications are placed into the database waiting for this process to pick them up and put them in the queue to be processed.

Where the producer and the consumer matches with this idea? The producer(s) can be a piece of code that weaks up regularly (could be even every 100 milliseconds), get applications from the database, creates the appropriate task and put them into the queue to be worked by the consumers.

The consumer would be the process that has the knowledge to make the decision on whether to approve or decline the credit card application.

Because we have this dissociation between the task being created and consumed, the producer is only responsible for putting tasks into the queue and the consumers are only responsible for going to the queue to pick up tasks and do the work. The consumers never stop and as soon as they finish with one task they pick up the next application to work with,

Let's Start Coding

We have two major entities that are the Producer and the Consumer. I’ll start coding into the Producer. Both, the producer and the consumer will be classes being able to run independently from the main java thread. In order to do so, this classes will need to either extend Thread or implement Runnable. To make this decision we need to ask ourselves if this classes will eventually need to extend from other classes. If that would be the case, then you better make them implement Runnable, as you may know in Java you can only extend from one class, but you can implement multiple. Since my example is very simple and is not intended to become something more complex I’ll extend from Thread in both cases.

package com.test.multithread;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class TaskProducer extends Thread {
private boolean blnExit = false;
private final List<TaskConsumer> consumers;
private final BlockingQueue<Long> sharedQueue;

public TaskProducer(final BlockingQueue<Long> sharedQueue, 
final List<TaskConsumer> consumers) {
this.sharedQueue = sharedQueue;
this.consumers = consumers;
}

@Override
public void run() {
    long i = 0;

////////////////////////////////////////////
// PRODUCING THE OBJECTS TO BE CONSUMED
////////////////////////////////////////////
while (!blnExit) {
try {
i++;
sharedQueue.put(Long.valueOf(i));

} catch (final InterruptedException ex) {
ex.printStackTrace();
}
}

/////////////////////////////////
// WAIT UNTIL THE QUEUE IS EMPTY
/////////////////////////////////
while (sharedQueue.size() > 0) {
try {
Thread.sleep(200);
System.out.println("Producer waiting to end.");
} catch (final InterruptedException e) {
break;
}
}

////////////////////////////////////////////
// SEND TO ALL CONSUMERS THE EXIT CONDITION
////////////////////////////////////////////
for (final TaskConsumer consumer : consumers) {
consumer.setExitCondition(true);
}
}
}

As you can see it is a very simple code that creates tasks that consist on a sequential number. That number it’s being put into the queue to be processed later.

There is actually something that needs to be specially mentioned here and that is the use of a BlockingQueue. The reason why we use this type of Queue is because the implementations of BlockingQueue are thread-safe and this is an essential feature in a Producer / Consumer scenario. This way putting and removing elements from the Queue is executed atomically.

Now we’ll see the Consumer code:

package com.test.multithread;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class TaskConsumer extends Thread {
boolean blnExit = false;
private final int id;
private final BlockingQueue<Long> sharedQueue;

public TaskConsumer(final int id, final BlockingQueue<Long> sharedQueue) {
this.id = id;
this.sharedQueue = sharedQueue;
}

public void setExitCondition(final boolean blnDoExit) {
blnExit = blnDoExit;
}

@Override
public void run() {
final Random generator = new Random();

while (!blnExit) {
try {
if (sharedQueue.size() > 0) {
System.out.println("Consumer id:" + id + 
" sent email " + sharedQueue.take());

// TO BE REMOVED (ONLY SIMULATES RANDOM WORKING TIME)
final long start = System.currentTimeMillis();
Thread.sleep(generator.nextInt(1000) + 1000);
final long end = System.currentTimeMillis();
} else
Thread.sleep(500);

} catch (final InterruptedException ex) {
ex.printStackTrace();
}
}

System.out.println("Consumer " + id + " exiting");
}
}

Also very simple right?

The consumer will take applications from the queue and simulate the processing (through a randomized working time) to make this more real. Whenever it finishes with the application it take the next one and so on so forth. In order to make this possible the consumer receives an instance of the queue when it’s created and a certain id that identifies it.

Now that we have both parts done, lets put everything together writing the code that actually create instances of these two classes and kick off the process.

package com.test.multithread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumerPattern {
private final int queueCapacity = 200;
private int numberOfThreads = 10;

public static void main(final String args[]) {
new ProducerConsumerPattern(20);
}

public ProducerConsumerPattern(final int numberOfThreads) {
if (numberOfThreads <= 0 || numberOfThreads > 100) 
throw new IllegalArgumentException("The number of threads should be a number between 1 and 100");

this.numberOfThreads = numberOfThreads;

//Creating shared object
final BlockingQueue<Long> sharedQueue = new LinkedBlockingQueue<Long>(queueCapacity);

// Creating and starting the Consumer Threads
final List<TaskConsumer> consumers = new ArrayList<TaskConsumer>();
for (int i = 0; i <= this.numberOfThreads; i++) {
final TaskConsumer consThread = new TaskConsumer(i, sharedQueue);
consThread.start();
consumers.add(consThread);
}

// Creating and starting the Producer Thread
final TaskProducer prodThread = new TaskProducer(sharedQueue, consumers);
prodThread.start();

}
}

Once again the code is very simple and easy to understand. The main class creates the BlockingQueue and creates one Producer and 20 Consumer objects, passing the queue to each of them to add and remove objects to the queue.

Three small and simple classes and you are ready to run a Producer / Consumer application. The only thing you need to do now is add some real logic to this code and put it to work. It is a really powerful design that let you run potentially thousands of task in the background with minimal complexity and high reliability.

Next article will be about a fairly simple way of adding monitoring capabilities to background processes to simplify the way we control its performance and potentially manipulate the way the process behave.

Don't miss it.

consumer producer Design application Task (computing)

Opinions expressed by DZone contributors are their own.

Related

  • How to Use JMS ActiveMQ With Mule 4: Part 1
  • Mastering AI Agents: How Agentic Design Patterns Make Agents Smarter
  • Mastering Concurrency: An In-Depth Guide to Java's ExecutorService
  • Understanding the Integration of Embedded Systems in Consumer Electronics

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: