C# 8 is just around the corner — and with that, this post will review the last, currently publicly available feature of the new language version: asynchronous streams. If you are interested in my take on the other two, currently available features, check out my posts on them here:

What is an async stream?

Basically in the current language version, you have a bunch of requirements that you have to satisfy if you want to create an async method. The return type must be void, or some other type that has a GetAwaiter() method that implements a special interface. Task is just one example of this. Even now, you can use ValueTask as another potential return type for async methods.

Again in the current language version, you can create iterators. An iterator is a construct that is create in a method that uses the yield return construct. A method like this by definition must return IEnumerable<T>. And here lies the problem: this interface cannot be used as the return type of an async method, so you cannot create async iterators. And since the method itself cannot be an async method, you cannot use the await keyword inside the method either, making calling and processing async code extremely difficult in these cases.

In the original announcement post Mads Torgensen describe what this new feature is with a very simple, and not at all real-life based scenario.

static async IAsyncEnumerable<string> GetNames()
{ 
  string[] names = { "Archimedes", "Pythagoras", "Euclid", "Socrates", "Plato" };
  foreach (var name in names)
  {
    await Task.Delay(100);
    yield return name;
  }
}

And then, you can use it like this:

await foreach (var name in GetNamesAsync())
{
  // process name
}

The example shows the new language feature and the new .NET components and how they can be used. We have a new type, IAsyncEnumerable<T> which is compatible with both the yield return and the async language requirements, meaning that now we can use the await keyword when creating iterators. The feature definitely looks cool, but my first question was immediately how I would use this. And then it hit me.

Using IAsyncEnumerable with the Azure Storage SDK

Whenever you want to list all the blobs of an Azure blob container (or all the containers of a storage account, or list basically anything that can be listed), you are making a network call. The SDK is just a wrapper around the REST interface of the storage account, so you are doing an I/O operation. And as such, you should always use asynchronous method calls. Fortunately, the SDK meets this requirement and gives you a method that you can use the list blobs asynchronously.

But the API itself has a design principle: every API call returns only a portion of the results. So if you want to list all the containers, you have to issue multiple asynchronous calls to the API through the SDK. With the actual results you also get back a token that you can attach to the next call, so the API knows where to continue serving the data from. To be honest, I have always found this to be a bit tiresome, so I have this handy little extension method that does this for me:

public static async Task<IEnumerable<IListBlobItem>> ListBlobsAsync(this CloudBlobContainer container)
{
  List<IListBlobItem> results = new List<IListBlobItem>();
  BlobContinuationToken token = null;
  do
  {
    var result = await container.ListBlobsSegmentedAsync(token);
    token = result.ContinuationToken;
    results.AddRange(result.Results);
  }
  while (token != null);
  return results;
}

And then, I can enumerate the results like this:

CloudStorageAccount acc = CloudStorageAccount.Parse("connectionStringHere");
var blobClient = acc.CreateCloudBlobClient();
var container = blobClient.GetContainerReference("myContainer");
foreach (var blob in await container.ListBlobsAsync())
{
  // Process blob
}

This is kinda OK. When the ListBlobsAsync() method is called, there is no blocking on the calling thread. The method issues the required number of async calls on a threadpool thread, and when everything is done, the method returns. But there is one catch: while the method returns an IEnumerable<T>, in reality, it is a List<T>. So this means that while there is no blocking in the sense that no CPU cycles are wasted waiting to download all the data, the iteration of the blobs themselves cannot start until after all the blobs are downloaded. Again, in the meantime, all my CPU cycles are used for other, meaningful purposes, so this is not that big of a deal. But how cool would it be if the code could actually start evaluating the first bunch of results after the first async call instead of having to wait for all the calls to complete? And now, with the async streams feature, this is possible:

public static async IAsyncEnumerable<IListBlobItem> ListBlobsAsync(this CloudBlobContainer container)
{            
  BlobContinuationToken token = null;
  do
  {
    var result = await container.ListBlobsSegmentedAsync(token);
    token = result.ContinuationToken;
    foreach (var blob in result.Results)
    {
      yield return blob;
    }
  }
  while (token != null);            
}

This version of the code does one call to download a bunch of blobs. This is done asynchronously, so no CPU cycles are waited waiting. Cool. But then, when the first bunch of results are downloaded, the code actually starts enumerating it with yield return and returning the items one-by-one to the calling method! And then, when all the results are returned, execution continues on the next iteration with another asynchronous call, whose results are again iterated and streamed back to the caller. And now I can write code like this:

CloudStorageAccount acc = CloudStorageAccount.Parse("connectionStringHere");
var blobClient = acc.CreateCloudBlobClient();
var container = blobClient.GetContainerReference("myContainer");
await foreach (var blob in container.ListBlobsAsync())
{
  // Process blob
}

Checking the internals

This is a very interesting new feature, so I thought it was worth looking into the internals a little bit. So the new interface looks like this:

public interface IAsyncEnumerable<out T>
{
  IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
}

This is basically the direct counterpart of the synchronous interface. The IAsyncEnumerator<T> looks like this:

public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
        T Current { get; }
        ValueTask<bool> MoveNextAsync();
}

This is also the async counterpart of the synchronous version. But the MoveNextAsnyc return ValueTask<T> instead of the regular Task<T> which is a nice touch. After all, this async iterator will most probably always be consumed right after creating the it with a foreach and not awaited multiple times or stored in a dictionary or whatever, so the ValueTask<T> can be a good choice. And of course since the foreach calls MoveNextAsync() every time the foreach steps one forward, having a regular Task<T> would mean a lot of allocations. With ValueTask<T> these allocations are avoided and this makes it a lot more efficient.
The IAsyncDisposable is again just the async counterpart of IDisposable.

Also, we are encouraged to use ConfigureAwait(false) on tasks whenever we create class libraries to avoid capturing the source synchronization context. And I do see this feature used a lot in class libraries, so we need a way to the same for the iterator as well. Thankfully we get a handy little extension method to do just that:

await foreach (var blob in container.ListBlobsAsync().ConfigureAwait(false))
{ }

And we also get cancellation with an extension method, so that's fun :) I'm not really sure how that works (what does cancellation mean: cancelling the iteration, cancelling the current iteration and moving to the next one, can the iterator be reused after cancellation and if so, how is disposal handled etc), but will definitely look into it, because I do like this feature.

Verdict

This is a very cool feature. Something that really adds to the programming language and the platform. It will definitely take some time to get used to this feature an find the right places where it can be used. Of couse, this new feature will only be available if you target .NET Core. While I'm a bit sad, I totally understand and I can see that a lot of changes must have been made to the runtime to make this work. After all, onwards and upwards!

Countdown to C# 8: Making the Azure Storage API better with IAsyncEnumerable
Share this

© 2019. All Rights Reserved.

Falconium theme based on Ghostium Theme

Proudly published with Ghost