Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Before C# 8.0 Async Streams Come Out

DZone's Guide to

Before C# 8.0 Async Streams Come Out

If you don’t want to wait 'til C# 8.0 comes out to take advantage of this funcationality, there is an option available right now.

· Web Dev Zone ·
Free Resource

Deploying code to production can be filled with uncertainty. Reduce the risks, and deploy earlier and more often. Download this free guide to learn more. Brought to you in partnership with Rollbar.

While C# 8.0 promises a built-in support for async for-each enumerators (known as ‘Async Streams’), the release date still remains unknown. If you don’t want to wait but feel like that would be beneficial for your project, there is an option available right now.


Watch the explanation of Async Streams on Channel 9 with Mads Torgersen

If you already know why async streams are needed, you can skip this section.

First, let’s quickly look at the problem and how Async Streams solve it. Image a scenario where you do a query against a database and perform processing on each data row, like this:

void ProcessDbRows()
{
  using (var dbContext = CreateDbContext(...))
  {
    var command = ...;
    using (var reader = command.ExecuteReader())
    {
      while (reader.Read())
      {
        var row = GetRowValues(reader);
        ProcessRow(row);
      }
    }
  }
}

The problem of the function above is in combining multiple responsibilities, and the most elegant way would be to hide the DB row enumeration behind an abstraction using theIEnumerable interface:

IEnumerable<Row> ProduceRows()
{
  using (var dbContext = CreateDbContext(...))
  {
    var command = ...;
    using (var reader = command.ExecuteReader())
    {
      while (reader.Read())
      {
        var row = GetRowValues(reader);
        yield return row; // Produce an item.
      }
    }
  }
}

void ConsumeRows()
{
  var rowStream = GetRows();
  foreach (var row in rowStream)
    ProcessRow(row); // Consume an item.
}

But then you realize that the performance of your application can suffer due to blocking IO calls on worker threads because there are no async functions with await statements. Thus having the ability to write async code in a similar manner that shown above would be very handy — that’s what Async Streams solve. However, there is no friendly way to do so with existing libraries, except the one I’m going to share with you.


Back in February 2016, the ‘AsyncEnumerable’ library for C# was born. Published as the AsyncEnumerator NuGet package, because the ‘AsyncEnumerable’ name was already taken.

The goal of this library is to provide a user-friendly feel to the use of asynchronous enumerators, similar to how you would use built-in support for regular synchronous enumerators (the IEnumerable and related extension methods).

Let’s look at the previous sample code which has been adapted to be asynchronous with the help of the AsyncEnumerable library:

using System.Collections.Async;

IAsyncEnumerable<Row> ProduceRows() =>  // <-- new interface
new AsyncEnumerable(async yield => // <-- helper class
{
  using (var dbContext = CreateDbContext(...))
  {
    var command = ...;
    using (var reader = await command.ExecuteReaderAsync()) // <-- async call
    {
      while (await reader.ReadAsync()) // <-- async call
      {
        var row = await GetRowValuesAsync(reader); // <-- async call
        await yield.ReturnAsync(row); // Produce an item asynchronously.
      }
    }
  }
});

async Task ConsumeRowsAsync() // <-- async method
{
  var rowStream = GetRows();
  await rowStream.ForEachAsync(async row => // <-- extension method
  {
    await ProcessRowAsync(row); // Consume an item asynchronously.
  });
}

The library introduces a new IAsyncEnumerable interface, which is implemented by the helper class AsyncEnumerable. It takes in an asynclambda in the constructor which represents the enumerator body, where the yield is just used as an input argument name — C# does not enforce the reserved keyword policy in this case. That trick allows you to call a method on it as await yield.ReturnAsync() which is very similar to the built-in yield return statement. In fact, you can rename ‘yield’ to anything else, but you’d lose that nice readability.

On the consuming side, you can find the extension method ForEachAsync to process your asynchronous stream of items with a lambda function (can be either synchronous or asynchronous). Such syntax might be slightly non-ideal like foreach await (var item in stream), but is the best I could come up with.

Another great feature of the library is a support for the combination of ‘async’, ‘for-each’, and ‘parallel’ that allows us to process items from an async stream in parallel with a given concurrency. Here is an example based on the code block above:

using System.Collections.Async;

async Task ConsumeRowsAsync()
{
  var rowStream = GetRows();
  await rowStream.ParallelForEachAsync(
    async row =>
    {
      await ProcessRowAsync(row);
    },
    maxDegreeOfParalellism: 5,
    cancellationToken: default);
}

The maxDegreeOfParalellism argument (yes, it has a typo) controls how many items can be scheduled for processing in parallel, however, the actual number depends on how many worker threads are available at the moment. This method greatly helps in scenarios where you want to achieve the perfect balance between efficiency and capability of the stream source (you don’t want to get throttled). So next time when you see a suggestion to use the Task.WhenAll, please don’t.


We (the company I currently work at) have been heavily using this library for the last couple of years in the core of one of the projects, what saved us a lot of development and maintenance effort compared to a previous approach we had before. And I’d like to share with you a couple of real examples.

The first example is an extension method to the Azure’s CloudTable class that allows us to stream all table rows that match the query criteria:

namespace Microsoft.WindowsAzure.Storage.Table
{
    // Needs the AsyncEnumerator NuGet package:
    // https://www.nuget.org/packages/AsyncEnumerator
    using System.Collections.Async;

    public static class CloudTableExtensions
    {
        /// <summary>
        /// Creates an async stream of items from given query.
        /// </summary>
        public static IAsyncEnumerable<T> Stream<T>(
            this CloudTable table, TableQuery<T> query)
            where T : ITableEntity, new() =>
            new AsyncEnumerable<T>(async yield =>
            {
                TableContinuationToken continuationToken = null;
                do
                {
                    var segment = await table.ExecuteQuerySegmentedAsync(
                        query, continuationToken, yield.CancellationToken)
                        .ConfigureAwait(false);

                    foreach (var item in segment.Results)
                        await yield.ReturnAsync(item).ConfigureAwait(false);

                    continuationToken = segment.ContinuationToken;
                }
                while (continuationToken != null);
            });
    }

The Azure Cloud Table has a limit of 1000 result rows per request, so this extension method hides that limitation by executing the query by segments and renders the result as a continuous steam.

Another example shows how to convert an Azure Storage Queue into a continuous stream of messages:

namespace Microsoft.WindowsAzure.Storage.Queue
{
    // Needs the AsyncEnumerator NuGet package:
    // https://www.nuget.org/packages/AsyncEnumerator
    using System.Collections.Async;

    public static class CloudQueueExtensions
    {
        /// <summary>
        /// Creates a async stream of queue messages.
        /// </summary>
        public static IAsyncEnumerable<CloudQueueMessage> Stream(
            this CloudQueue queue,
            TimeSpan invisibleTime,
            TimeSpan idleTime,
            int batchSize) =>
            new AsyncEnumerable<CloudQueueMessage>(async yield =>
            {
                while (!yield.CancellationToken.IsCancellationRequested)
                {
                    var messages = await queue.GetMessagesAsync(
                        batchSize,
                        invisibleTime,
                        null,
                        null,
                        yield.CancellationToken)
                        .ConfigureAwait(false);

                    var anyMessage = false;

                    foreach (var message in messages)
                    {
                        await yield.ReturnAsync(message)
                            .ConfigureAwait(false);

                        anyMessage = true;
                    }

                    if (!anyMessage)
                    {
                        await Task.Delay(idleTime, yield.CancellationToken)
                            .ConfigureAwait(false);
                    }
                }
            });
    }
}

If the message queue is empty, the async stream will keep waiting for the next one to appear until the caller (consumer) cancels the enumeration with a cancellation token.

Both examples are basic and don’t have any error handling for the clarity sake.


Besides asynchronous enumeration, you can also find other useful extension methods for converting IEnumerable to IAsyncEnumerable and most of the standard LINQ functions like Select, GroupBy, Batch, SingleAsync, etc.

When C# 8.0 comes out, the library will continue to exist with all of its extension methods, but the core enumerator will be integrated to the .NET framework, where the code upgrade should be straightforward due to high similarity in the syntax.

The project is open-source, available on GitHub and NuGet.org, and distributed under the MIT license.

Deploying code to production can be filled with uncertainty. Reduce the risks, and deploy earlier and more often. Download this free guide to learn more. Brought to you in partnership with Rollbar.

Topics:
async await ,parallel programming ,enumeration ,web dev ,c#

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}