Over a million developers have joined DZone.

.NET 4.5 Baby Steps, Part 3: IDataFlowBlock

·

Introduction

A new interface in .NET is the IDataFlowBlock, which is implemented in many interesting ways, so to look at those we will start off with the simplest implementation. ActionBlock<TInput> is a completely new class in .NET 4.5 and provides a way of working with data in a very task orientated way. I simplistically think of this as the implementation of the IObserver interface we got in .NET 4. but do not limit your thinking to just that.

To use it, you first must add a reference to System.Threading.Tasks.DataFlow

image

In this simple first example I am doing a fairly simple Pub/Sub demo:

var subscriber = new ActionBlock<string>(input =>
    {
        Console.WriteLine("Got: {0}", input);
    });

for (int i = 0; i < 10; i++)
{
    Task.Factory.StartNew(() =>
        {
            Thread.Sleep(new Random().Next(200, 1000));
            subscriber.Post(DateTime.Now.ToLongTimeString());
        });
}

Console.ReadLine();
CropperCapture[3]

As IObserver<T>

So the first fantastic feature is that it does have the ability (via extension method) to be an IObsserver<T> so it really solves the need to build up your own subscriber classes when implementing a pub/sub model.

First is the code for the publisher class – this is normal for the IObservable<T> as we had in .NET 4. This just means our new code can play well with our existing code.

public class Publisher : IObservable<string>
{
    List<IObserver<string>> subscribers = new List<IObserver<string>>();

    public IDisposable Subscribe(IObserver<string> observer)
    {
        subscribers.Add(observer);
        return null;
    }

    public void Send()
    {
        foreach (var item in subscribers)
        {
            item.OnNext(DateTime.Now.ToLongTimeString());
        }
    }
}

For our demo code, which produces the same as above:

var publisher = new Publisher();

var subscriber = new ActionBlock<string>(input =>
    {
        Console.WriteLine("Got: {0}", input);
    });

publisher.Subscribe(subscriber.AsObserver());

for (int i = 0; i < 10; i++)
{
    Task.Factory.StartNew(() =>
        {
            Thread.Sleep(new Random().Next(200, 1000));
            publisher.Send();
        });
}

Complete

The next awesome feature is the Complete method which can be used to stop accepting of input when called – this is great for services where you want to shut down.

In this demo code it will run until you press enter:

var subscriber = new ActionBlock<string>(input =>
{
    Console.WriteLine("Got: {0}", input);
});


Task.Factory.StartNew(() =>
{
    while (true)
    {

        Thread.Sleep(new Random().Next(200, 1000));
        subscriber.Post(DateTime.Now.ToLongTimeString());
    }
});

Console.WriteLine("Press any key to stop input");
Console.ReadLine();
subscriber.Complete();
AttachmentSize
ActionBlock<T> demos8.36 KB
Topics:

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}