DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Analysis of Failure Modes in Producer-Consumer Systems
  • Understanding Kafka and Event-Driven Architecture [Video Tutorials]
  • Why Database Migrations Take Months and How to Speed Them Up
  • Unmasking Entity-Based Data Masking: Best Practices 2025

Trending

  • How Kubernetes Cluster Sizing Affects Performance and Cost Efficiency in Cloud Deployments
  • How To Introduce a New API Quickly Using Quarkus and ChatGPT
  • How to Merge HTML Documents in Java
  • Creating a Web Project: Caching for Performance Optimization
  1. DZone
  2. Data Engineering
  3. Data
  4. Concurrency Pattern: Producer and Consumer

Concurrency Pattern: Producer and Consumer

By 
Kapil Viren Ahuja user avatar
Kapil Viren Ahuja
·
Aug. 29, 11 · Interview
Likes (2)
Comment
Save
Tweet
Share
67.9K Views

Join the DZone community and get the full member experience.

Join For Free

In my career spanning 15 years, the problem of Producer and Consumer is one that I have come across only a few times. In most programming cases, what we are doing is performing functions in a synchronous fashion where the JVM or the web container handles the complexities of multi-threading on its own. However, when writing certain kinds of use cases where we need this. Last week, I came acros one such use case that sent me 3 years back when I last did it. However, the way it was done last time was very different.

When I first heard the problem statement, I knew instantly what was needed. However, my approach to doing it this time was going to be different from last time. It had simply to do with how I am viewing technology in my life today. I will not go into any non-technical side and will jump straight into the problem and its solution. I started to look at what existed in the market and did come across a couple of posts that helped me in channelizing my thoughts in the right way.

Problem Statement

We need a solution for a batch migration. We are migrating data form System 1 to System 2 and in the process we need to do three tasks:

  • Load data from Database based on groups
  • Process the data
  • Update the records loaded in step#1 with modifications

We have to handle 100s of groups and each group will have around 40K records. You can imagine the amount of time it would take if we were to perform this exercise in a synchronous fashion.  Image here explains this problem in an effective way.

Producer Consumer: The Problem

Producer Consumer: The Problem

Producer and Consumer Pattern

Let us take a look at the Producer Consumer pattern to begin with. If you refer to the problem statement above and look at the image, we see that there are so many entities who are ready with their part of data. However, there are not enough workers who can process all the data. Hence, as the producers continue to line-up in a queue it just continues to grow. We see that the systems start to hog up threads and take a lot of time.

Intermediate Solution

Producer Consumer: The Intermediate approch

Producer Consumer: The Intermediate approch

We do have an intermediate solution. Refer to the image and you will immediately notice that the producers are piling up their work in a filing cabinet and the worker continues to pick it up as they get done with the previous task. However, this approach does have some glaring shortcomings:

  1. There is still one worker who has to do all the work. The external systems may be happy, but the task will still continue to exist until the worker has completed all of the tasks
  2. The producers will pile up their data in a queue and it needs resources to hold the same. Just as in this example the cabinet can fill up, the same can happen with the JVM resources too. We need to be careful how much data we are going to place in memory and in some cases it may not be much.

The Solution

Producer Consumer: The Solution

Producer Consumer: The Solution

The solution is what we see everyday in many places – like the cinema hall queue, Petrol Pumps etc. There are so many people who come in to book a ticket and based on how many people come in, the more people are added to issue tickets. Essentially, refer to image here and you will notice that Producers will keep adding their jobs to the cabinet and we have more workers to handle the work load.

Java provided concurrency package to solve this issue. Till now, I have always worked on threading at a much lower level and this was first time I was going to work with this package. As I started to explore the web and read fellow bloggers with what they have to say, I came across one very good article. It helped in understanding the use of BlockingQueue in a very effective manner. However, the solutions provided by Dhruba would not have helped me in achieving the high throughput which is needed. So, I started to explore the use of ArrayBlockingQueue for the same.

The Controller

This is the first class where the contract between the producers and consumers are managed. The controller will setup 1 thread for the Producer and 2 threads for the consumer. Based on the needs we can create as many threads as we need; and even can even read the data from a properties or do some dynamic magic. For now, we will keep this simple.

package com.kapil.techieforever.producerconsumer;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestProducerConsumer
{

public static void main(String args[])
{
try
{
Broker broker = new Broker();

ExecutorService threadPool = Executors.newFixedThreadPool(3);


threadPool.execute(new Consumer("1", broker));
threadPool.execute(new Consumer("2", broker));
Future producerStatus = threadPool.submit(new Producer(broker));

// this will wait for the producer to finish its execution.
producerStatus.get();


threadPool.shutdown();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}

I am using ExecuteService to create a thread pool and manage it. Instead of using the basic Thread implementation, this is a more effective way as it will handle the exiting and restarting the threads as needed. You will also notice that I am using Future class to get the status of the producer thread. This class is very effective and will halt my program from further execution. This is a nice way of replacing the “.join” method on the threads. Note: I am not using Future very effectively in this example; so you may have to try a few things as you feel fit.

Also, you should note the Broker class which is being used as filing cabinet between the producers and consumers. We will see its implementation in just a little while.

The Producer

This class is responsible for producing the data that needs to be worked upon.

package com.kapil.techieforever.producerconsumer;

public class Producer implements Runnable
{
private Broker broker;

public Producer(Broker broker)
{
this.broker = broker;
}


@Override
public void run()
{
try
{
for (Integer i = 1; i < 5 + 1; ++i)
{
System.out.println("Producer produced: " + i);
Thread.sleep(100);
broker.put(i);
}

this.broker.continueProducing = Boolean.FALSE;
System.out.println("Producer finished its job; terminating.");
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}

}
}

This class is doing the most simplest of things that it can do – adding an integer to the broker. Some key areas to note are:
 1. There is a property on Broker which is updated in the end by the producer when its done producing. This is also known as the “final” or “poison” entry. This is used by the consumers to know that there are no more data coming up
 2. I have used Thread.sleep to simulate that some producers may take more time to produce the data. You can tweak this value and see the consumers act

The Consumer

This class is responsible for reading the data from the broker and doing its job

package com.kapil.techieforever.producerconsumer;

public class Consumer implements Runnable
{

private String name;
private Broker broker;


public Consumer(String name, Broker broker)
{
this.name = name;
this.broker = broker;
}


@Override
public void run()
{
try
{
Integer data = broker.get();

while (broker.continueProducing || data != null)
{
Thread.sleep(1000);
System.out.println("Consumer " + this.name + " processed data from broker: " + data);

data = broker.get();
}


System.out.println("Comsumer " + this.name + " finished its job; terminating.");
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}

}

This is again a simple class that reads the Integer and prints it on the console. However, key points to note are:
  1. The loop to process data is an endless loop, that runs on two conditions – until the producer is consuming and there is some data with the broker
  2. Again, the Thread.sleep is used to create effective and different scenarios

The Broker

package com.kapil.techieforever.producerconsumer;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class Broker
{
public ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(100);
public Boolean continueProducing = Boolean.TRUE;

public void put(Integer data) throws InterruptedException
{
this.queue.put(data);
}

public Integer get() throws InterruptedException
{
return this.queue.poll(1, TimeUnit.SECONDS);
}
}

The very first thing to note is that we are using ArrayBlockingQueue as the data holder. I am not going to say what this does, but insist you to read it on the JavaDocs here. however, I will explain that the producers are going to place the data in the queue and the consumers will fetch from the queue in FIFO format. But, if the producers are slow, the consumers will wait for data to come in and if the array is full, the producers will wait for it to fill up.

Also, note that I am using the ‘poll’ function instead of get in the queue. This is to ensure that the consumers will not keep waiting for ever and the waiting will time out after a few seconds. This helps us in inter-communication and kill the consumers when all the data is processed. (Note: try replacing poll with get and you will see some interesting outputs).

Code

I have the code sitting on Google project hosting. Feel free to go across and download it from there. It is essentially an eclipse (Spring STS) project. You may also get additional packages and classes when you download it based on when you are downloading it. Feel free to look into those too and share your comments
 - You can browse the source code on the SVN browser or;
 - You can download it from the project itself

 

From http://scratchpad101.com/2011/08/22/concurrency-pattern-producer-consumer/

producer consumer Data (computing) Database

Opinions expressed by DZone contributors are their own.

Related

  • Analysis of Failure Modes in Producer-Consumer Systems
  • Understanding Kafka and Event-Driven Architecture [Video Tutorials]
  • Why Database Migrations Take Months and How to Speed Them Up
  • Unmasking Entity-Based Data Masking: Best Practices 2025

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!