.NET and me Coding dreams since 1998!

21Sep/120

Awaitable task progress reporting

In my previous Task.WhileAll post I had a very fishy piece of code in sample where I put a Thread to sleep in order to give enough time for async composition to complete so the test could pass.

Well, every time I use a Thread.Sleep anywhere I know it is a bad thing so I've decided to get rid of it.

IProgressAsync<T>

The problem is related to the fact that IProgress<T> interface defines a void handler which can't be awaited and thus it ruins composition.

That's why I decided to define my own "true async" version of the interface which looks has the same report method but returning a Task I can await.

    
namespace WhileAllTests
{
    using System.Threading.Tasks;

    public interface IProgressAsync<in T>
    {
        Task ReportAsync(T value);
    }
}

Having a async version of reporting is VERY useful when the subscriber itself is awaiting further calls. I could get that with async void but using async void IMHO always turns to be bad solution so I choose to use the Task returning signature even I need a custom made interface for that.

And here's the implementation

    
    using System;
    using System.Threading.Tasks;

    public class ProgressAsync<T> : IProgressAsync<T>
    {
        private readonly Func<T, Task> handler;

        public ProgressAsync(Func<T, Task> handler)
        {
            this.handler = handler;
        }

        public async Task ReportAsync(T value)
        {
            await this.handler.Invoke(value);
        }
    }

No magic here:

  • Instead of Action<T> my ctor accepts the Func<T, Task> so I can await it
  • ReportAsync awaits in async manner the provided Task  enabling composition

Now having this in place I can update my task extension method to compose reporting method invocation

public static async Task<IList> WhileAll(this IList<Task> tasks, IProgressAsync progress)
        {
            var result = new List(tasks.Count);
            var remainingTasks = new List<Task>(tasks);
            while (remainingTasks.Count > 0)
            {
                await Task.WhenAny(tasks);
                var stillRemainingTasks = new List<Task>(remainingTasks.Count - 1);
                for (int i = 0; i < remainingTasks.Count; i++)
                {
                    if (remainingTasks[i].IsCompleted)
                    {
                        result.Add(remainingTasks[i].Result);
                        await progress.ReportAsync(remainingTasks[i].Result);
                    }
                    else
                    {
                        stillRemainingTasks.Add(remainingTasks[i]);
                    }
                }

                remainingTasks = stillRemainingTasks;
            }

            return result;
        }

With all this in place I can remove thread sleep from my unit tests and have it more useful

 

[TestClass]
public class UnitTest1 {

Listresult = new List();

[TestMethod]
    [TestClass]
    public class UnitTest1 {

        List<int> result = new List<int>();

        [TestMethod]
        public async Task TestMethod1() {

            var task1 = Task.Run(() => 101);
            var task2 = Task.Run(() => 102);
            var tasks = new List<Task<int>>() { task1, task2 };

            var listener = new ProgressAsync<int>(this.OnProgressAsync);
            var actual = await tasks.WhileAll(listener);

            Assert.AreEqual(2, this.result.Count);
            Assert.IsTrue(this.result.Contains(101));
            Assert.IsTrue(this.result.Contains(102));

            Assert.AreEqual(2, actual.Count);
            Assert.IsTrue(actual.Contains(101));
            Assert.IsTrue(actual.Contains(102));
        }

       private async Task OnProgressAsync(int arg) { result.Add(arg); }
    }

There you go, updated code of Task.WhileAll can be found here

Filed under: Development No Comments
20Sep/120

Task.WhileAll

When Task.WhenAll met the Task.WhenAny…

There are two methods which can be used for awaiting an array of tasks in non blocking manner: Task.WhenAll and Task.WhenAny.

It is quite obvious how they work:

  • WhenAll completes when every task is completed,
  • WhenAny when any of the task is completed.

I needed today something which is a mix of those two and I’ve came up with something which completes when all is awaited but also provides hook for me to respond on awaits of individual tasks.

I’ve called my little extension: Task.WhileAll.

Extension method

Here's the complete implementation

    
public static class TaskExtensions 
{
	public static async Task<IList<T>> WhileAll<T>(this IList<Task<T>> tasks, IProgress<T> progress) 
	{
		var result = new List<T>(tasks.Count);
		var done = new List<Task<T>>(tasks);
		while(done.Count > 0) 
		{
			await Task.WhenAny(tasks);
			var spinning = new List<Task<T>>(done.Count - 1);
			for(int i = 0; i < done.Count; i++) 
			{
				if(done[i].IsCompleted) 
				{
					result.Add(done[i].Result);
					progress.Report(done[i].Result);
				} else {
					spinning.Add(done[i]);
				}
			}

			done = spinning;
		}

		return result;
	}
}

 

The code is quire simple:

  • it is an async extension method extending the IList<Task<T>>
  • method returns on completition IList<T> (result of awaited tass)
  • Method accepts IProgress<T> interface publishing information about the tasks who had just completed to the interested subscribers
  • Inside of the method body we have a loop which is active as long there are tasks with IsCompleted==false.
  • The loop has a “sleep” line where I use await Task.WhenAny to asynchronuslly wait for any task to complete

Unit test

Here’s a simple unit test ilustrating the usage of the extension method

  
    using System;
    using System.Collections.Generic;
    using System.Threading;
    using System.Threading.Tasks;

    using Microsoft.VisualStudio.TestTools.UnitTesting;

    [TestClass]
    public class UnitTest1 {
        [TestMethod]
        public async Task TestMethod1() {

            var task1 = Task.Run(() => 101);
            var task2 = Task.Run(() => 102);
            var tasks = new List<Task<int>>() { task1, task2 };


            List<int> result = new List<int>();
            var listener = new Progress<int>(
                taskResult => {
                    result.Add(taskResult);
                });

            var actual = await tasks.WhileAll(listener);
            Thread.Sleep(50); // wait a bit for progress reports to complete

            Assert.AreEqual(2, result.Count);
            Assert.IsTrue(result.Contains(101));
            Assert.IsTrue(result.Contains(102));

            Assert.AreEqual(2, actual.Count);
            Assert.IsTrue(actual.Contains(101));
            Assert.IsTrue(actual.Contains(102));
        }
    }

Again, nothing complicated :

  • I create an array of two dumb tasks
  • I define the listner which takes the task result and (for the unit test needs) adds it to the collection of results
  • I await the extension methods on task array with provided listner
  • Wait for 50 ms so the progress report would have time to finish (code is async after all)
  • Check that both tasks were reported to subscriber
  • Check that both tasks are returned as result.

Conclussion

That’s it. Dead simple code which I use A LOT to increase CPU utilization of my web crawlers.

Source code of the extension method and unit test can be found here.

Filed under: Development No Comments
21Jan/120

What is new in WCF in .NET 4.5 – Task and async

.NET 4.5 WCF – unit testable out of the box

As I mentioned already in How to get testable WCF code in simplest way? problem with abstracting WCF services occur due to the fact that the service contract is by definition not containing the async members defined and every solution I’ve seen enabling asynchronous calls to a WCF service adds a certain level of complexity to the code base so therefore I have chosen to use service generated proxy enhanced with some T4 magic creating appropriate interfaces.

I am happy to report that is not true any more and that

WCF in .NET 4.5 enables VERY easy asynchronous service calls in a testable manner out of the box.

Here’s a source code of the simplest illustration of what is the point. Usual constraints: works-for-me and take-it-as-an-idea-only.

In case all this async, Task<T> C# 5.0 things are new for you I suggest checking out some of presentations from my TPL delicious stack (especially “Future Directions For C#…” one there)

Server side code

Let’s stick to the simplest possible sample of a vanilla WCF service having a single operation returning a server time

using System;
using System.ServiceModel;
using System.Threading.Tasks;

namespace WcfService1
{
    [ServiceContract]
    public interface ITestService
    {
        [OperationContract]
        Task<DateTime> GetServerTimeAsync();
    }
}

As you can notice there are two interesting moments in the contact definition:

  • Returning type is not DateTime - it is Task<DateTime>
  • The name of the operation ends with Async which is a naming convention for marking new C# 5.0 async methods

Implementation of the service contract is equally trivial:

using System;
using System.Threading.Tasks;

namespace WcfService1
{
    public class TestService : ITestService
    {
        public async Task<DateTime> GetServerTimeAsync()
        {
            return DateTime.Now;
        }
    }
}

Implementation has three key moments:

  • Method name ends with Async
  • It returns Task<DateTime>
  • method has a async keyword which allows me to write a simple method body like I would do it normally and return a simple date time and completely forget about Task<T>

In other words, thanks to C# 5.0 all I have to do is to replace DateTime with async task<DateTime> and everything else stays the same – AWESOME!.

Client side code

I am going to add to the solution simple console application and create a trivial service client file

using System;
using System.ServiceModel;
using System.Threading.Tasks;
using WcfService1;

namespace ConsoleApplication1
{
    public class TestServiceClient : ClientBase<ITestService>, ITestService
    {
        public Task<DateTime> GetServerTimeAsync()
        {
            return Channel.GetServerTimeAsync();
        }
    }
}

No magic here: using shared service library I get the service contract on client and use it in combination with ClientBase<T> to create a simple class wrapper implementing via delegation service contract.

Now the class which simulates the one performing a wcf service call in its implementation

using System;
using System.Threading.Tasks;
using WcfService1;

namespace ConsoleApplication1
{
    public class ServerTimeReader
    {
        private readonly ITestService testService;

        public ServerTimeReader(ITestService testService)
        {
            this.testService = testService;
        }

        public async Task<DateTime> GetTimeAsync()
        {
            return await this.testService.GetServerTimeAsync();
        }
    }
}

The ServerTimeReader has a ITestService based dependency injected through its constructor. It has a method called GetTimeAsync which awaits the async wcf service call to be finished. All the mumbo jumbo of AMP, events etc. in a single keyword – brilliant.

Now when we have a class invoking a WCF call let’s ramp up IoC container and make a async call to server using the code we wrote so far.

using System;
using Microsoft.Practices.Unity;
using WcfService1;

namespace ConsoleApplication1
{
    class Program
    {
        private static UnityContainer container;

        static void Main(string[] args)
        {
            container = new UnityContainer();
            container.RegisterType<ITestService, TestService>();

            ReadTime();

            Console.ReadLine();
        }

        private static async void ReadTime()
        {
            var serverTimeReader = container.Resolve<ServerTimeReader>();
            var serverTime = await serverTimeReader.GetTimeAsync();

            Console.WriteLine("Server time:" + serverTime);
        }
    }
}

It's a console app so the entry point is static Main method which creates a new instance of IoC container (Unity in this sample) and adds to the IoC container mapping of the server side service contract with the service client I wrote

Then client calls a ReadTime method which uses IoC container to resolve ServerTimeReader instance injecting to it during that resolution a service client instance. Then the code awaits the GetTimeAsync method which awaits the service client call which results with a asynchronous call to a server being made and awaited on client.

Once the server returns the result to client the client shows it in console – that’s it.

image

Conclusion

The simplicity of the code performing fully async call to a WCF service is so brilliant that I am not going to write unit test here for the GetTimeAsync method because it should be quite obvious how to do that. The code is almost the same as it would be if it was written for sync WCF calls and to learn just how to tacklethe TPL/async specific things check out this stack overflow page recommended by my friend Slobodan Pavkov

That’s it folks – hope this will be useful to someone!