DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
  1. DZone
  2. Data Engineering
  3. Databases
  4. Using BlockingCollection To Communicate Between Threads

Using BlockingCollection To Communicate Between Threads

Mike Hadlow user avatar by
Mike Hadlow
·
Dec. 02, 12 · Interview
Like (1)
Save
Tweet
Share
13.18K Views

Join the DZone community and get the full member experience.

Join For Free
Consider these (somewhat) common programming challenges:
  • I’m using a third party library that is not thread safe, but I want my application to share work between multiple threads. How do I marshal calls between my multi-threaded code to the single threaded library?
  • I have a single source of events on a single thread, but I want to share the work between a pool of multiple threads?
  • I have multiple threads emitting events, but I want to consume them on a single thread?

One way of doing this would be to have some shared state, a field or a property on a static class, and wrap locks around it so that multiple threads can access it safely. This is a pretty common way of trying to skin this particular cat, but it’s shot through with traps for the unwary. Also, it can hurt performance because access to the shared resource is serialized, even though the things accessing it are running in parallel.

A better way is to use a BlockingCollection and have your threads communicate via message classes.

BlockingCollection is a class in the new System.Collections.Concurrent namespace that arrived with .NET 4.0. It contains a ConcurrentQueue, although you can swap this for a ConcurrentStack or a ConcurrentBag if you want. You push objects in at one end and sit in a loop consuming them from the other. The (multiple) producer(s) and (multiple) consumer(s) can be running on different threads without any locks. That’s OK because the Concurrent namespace collection classes are guaranteed to be thread safe. The ‘blocking’ part of the name is there because the consuming end blocks until an object is available. Justin Etheredge has an excellent post that looks at BlockingCollection in more detail here.

For an example, let’s implement a parallel pipeline. A ventilator produces tasks to be processed in parallel, a set of workers process the tasks on separate threads, and a sink collects the results back together again. It shows both one-to-many and many-to-one thread communication. I’ve stolen the idea and the diagram from the excellent ZeroMQ Guide:

zguide_parallel_workers

First we’ll need a class that represents a piece of work, we’ll keep it super simple for this example:

public class WorkItem
{
    public string Text { get; set; }
}

We’ll need two BlockingCollections, one to take the tasks from the ventilator to the workers, and another to take the finished work from the workers to the sink:

var ventilatorQueue = new BlockingCollection<WorkItem>();
var sinkQueue = new BlockingCollection<WorkItem>();

Now let’s write our ventilator:

public static void StartVentilator(BlockingCollection<WorkItem> ventilatorQueue)
{
    Task.Factory.StartNew(() =>
    {
        for (int i = 0; i < 100; i++)
        {
            ventilatorQueue.Add(new WorkItem { Text = string.Format("Item {0}", i) });
        }
    }, TaskCreationOptions.LongRunning);
}

It just iterates 100 times creating work items and pushing them on the ventilatorQueue.

Here is a worker:

public static void StartWorker(int workerNumber,
    BlockingCollection<WorkItem> ventilatorQueue,
    BlockingCollection<WorkItem> sinkQueue)
{
    Task.Factory.StartNew(() =>
    {
        foreach (var workItem in ventilatorQueue.GetConsumingEnumerable())
        {
            // pretend to take some time to process
            Thread.Sleep(30);
            workItem.Text = workItem.Text + " processed by worker " + workerNumber;
            sinkQueue.Add(workItem);
        }
    }, TaskCreationOptions.LongRunning);
}

BlockingCollection provides a GetConsumingEnumerable method that yields each item in turn. It blocks if there are no items on the queue. Note that I’m not worrying about shutdown patterns in this code. In production code you’d need to worry about how to close down your worker threads.

Next let’s write our sink:

public static void StartSink(BlockingCollection<WorkItem> sinkQueue)
{
    Task.Factory.StartNew(() =>
    {
        foreach (var workItem in sinkQueue.GetConsumingEnumerable())
        {
            Console.WriteLine("Processed Messsage: {0}", workItem.Text);
        }
    }, TaskCreationOptions.LongRunning);
}

Once again, this sits in an infinite foreach loop consuming items from the sinkQueue.

Finally we need to wire up the pieces and kick it off:

StartSink(sinkQueue);

StartWorker(0, ventilatorQueue, sinkQueue);
StartWorker(1, ventilatorQueue, sinkQueue);
StartWorker(2, ventilatorQueue, sinkQueue);

StartVentilator(ventilatorQueue);

I’ve started the sink first, then the workers and finally the producer. It doesn’t overly matter what order they start in since the queues will store any tasks the ventilator creates before the workers and the sink start.

Running the code I get output something like this:

Processed Messsage: Item 1 processed by worker 1
Processed Messsage: Item 2 processed by worker 0
Processed Messsage: Item 0 processed by worker 2
Processed Messsage: Item 5 processed by worker 2
Processed Messsage: Item 3 processed by worker 1

....

Processed Messsage: Item 95 processed by worker 0
Processed Messsage: Item 98 processed by worker 0
Processed Messsage: Item 97 processed by worker 2
Processed Messsage: Item 96 processed by worker 1
Processed Messsage: Item 99 processed by worker 0

This pattern is a great way of decoupling the communication between a source and a sink, or a producer and a consumer. It also allows you to have multiple sources and multiple sinks, but primarily it’s a safe way for multiple threads to interact.

The complete example is here on GitHub.

Relational database Task (computing) Shared resource Threading Event Blocks IT

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • The 31 Flavors of Data Lineage and Why Vanilla Doesn’t Cut It
  • Top 5 Node.js REST API Frameworks
  • How to Quickly Build an Audio Editor With UI
  • Data Mesh vs. Data Fabric: A Tale of Two New Data Paradigms

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: