.NET and me Coding dreams since 1998!

21Sep/120

Awaitable task progress reporting

In my previous Task.WhileAll post I had a very fishy piece of code in sample where I put a Thread to sleep in order to give enough time for async composition to complete so the test could pass.

Well, every time I use a Thread.Sleep anywhere I know it is a bad thing so I've decided to get rid of it.

IProgressAsync<T>

The problem is related to the fact that IProgress<T> interface defines a void handler which can't be awaited and thus it ruins composition.

That's why I decided to define my own "true async" version of the interface which looks has the same report method but returning a Task I can await.

    
namespace WhileAllTests
{
    using System.Threading.Tasks;

    public interface IProgressAsync<in T>
    {
        Task ReportAsync(T value);
    }
}

Having a async version of reporting is VERY useful when the subscriber itself is awaiting further calls. I could get that with async void but using async void IMHO always turns to be bad solution so I choose to use the Task returning signature even I need a custom made interface for that.

And here's the implementation

    
    using System;
    using System.Threading.Tasks;

    public class ProgressAsync<T> : IProgressAsync<T>
    {
        private readonly Func<T, Task> handler;

        public ProgressAsync(Func<T, Task> handler)
        {
            this.handler = handler;
        }

        public async Task ReportAsync(T value)
        {
            await this.handler.Invoke(value);
        }
    }

No magic here:

  • Instead of Action<T> my ctor accepts the Func<T, Task> so I can await it
  • ReportAsync awaits in async manner the provided Task  enabling composition

Now having this in place I can update my task extension method to compose reporting method invocation

public static async Task<IList> WhileAll(this IList<Task> tasks, IProgressAsync progress)
        {
            var result = new List(tasks.Count);
            var remainingTasks = new List<Task>(tasks);
            while (remainingTasks.Count > 0)
            {
                await Task.WhenAny(tasks);
                var stillRemainingTasks = new List<Task>(remainingTasks.Count - 1);
                for (int i = 0; i < remainingTasks.Count; i++)
                {
                    if (remainingTasks[i].IsCompleted)
                    {
                        result.Add(remainingTasks[i].Result);
                        await progress.ReportAsync(remainingTasks[i].Result);
                    }
                    else
                    {
                        stillRemainingTasks.Add(remainingTasks[i]);
                    }
                }

                remainingTasks = stillRemainingTasks;
            }

            return result;
        }

With all this in place I can remove thread sleep from my unit tests and have it more useful

 

[TestClass]
public class UnitTest1 {

Listresult = new List();

[TestMethod]
    [TestClass]
    public class UnitTest1 {

        List<int> result = new List<int>();

        [TestMethod]
        public async Task TestMethod1() {

            var task1 = Task.Run(() => 101);
            var task2 = Task.Run(() => 102);
            var tasks = new List<Task<int>>() { task1, task2 };

            var listener = new ProgressAsync<int>(this.OnProgressAsync);
            var actual = await tasks.WhileAll(listener);

            Assert.AreEqual(2, this.result.Count);
            Assert.IsTrue(this.result.Contains(101));
            Assert.IsTrue(this.result.Contains(102));

            Assert.AreEqual(2, actual.Count);
            Assert.IsTrue(actual.Contains(101));
            Assert.IsTrue(actual.Contains(102));
        }

       private async Task OnProgressAsync(int arg) { result.Add(arg); }
    }

There you go, updated code of Task.WhileAll can be found here

Filed under: Development No Comments
20Sep/120

Task.WhileAll

When Task.WhenAll met the Task.WhenAny…

There are two methods which can be used for awaiting an array of tasks in non blocking manner: Task.WhenAll and Task.WhenAny.

It is quite obvious how they work:

  • WhenAll completes when every task is completed,
  • WhenAny when any of the task is completed.

I needed today something which is a mix of those two and I’ve came up with something which completes when all is awaited but also provides hook for me to respond on awaits of individual tasks.

I’ve called my little extension: Task.WhileAll.

Extension method

Here's the complete implementation

    
public static class TaskExtensions 
{
	public static async Task<IList<T>> WhileAll<T>(this IList<Task<T>> tasks, IProgress<T> progress) 
	{
		var result = new List<T>(tasks.Count);
		var done = new List<Task<T>>(tasks);
		while(done.Count > 0) 
		{
			await Task.WhenAny(tasks);
			var spinning = new List<Task<T>>(done.Count - 1);
			for(int i = 0; i < done.Count; i++) 
			{
				if(done[i].IsCompleted) 
				{
					result.Add(done[i].Result);
					progress.Report(done[i].Result);
				} else {
					spinning.Add(done[i]);
				}
			}

			done = spinning;
		}

		return result;
	}
}

 

The code is quire simple:

  • it is an async extension method extending the IList<Task<T>>
  • method returns on completition IList<T> (result of awaited tass)
  • Method accepts IProgress<T> interface publishing information about the tasks who had just completed to the interested subscribers
  • Inside of the method body we have a loop which is active as long there are tasks with IsCompleted==false.
  • The loop has a “sleep” line where I use await Task.WhenAny to asynchronuslly wait for any task to complete

Unit test

Here’s a simple unit test ilustrating the usage of the extension method

  
    using System;
    using System.Collections.Generic;
    using System.Threading;
    using System.Threading.Tasks;

    using Microsoft.VisualStudio.TestTools.UnitTesting;

    [TestClass]
    public class UnitTest1 {
        [TestMethod]
        public async Task TestMethod1() {

            var task1 = Task.Run(() => 101);
            var task2 = Task.Run(() => 102);
            var tasks = new List<Task<int>>() { task1, task2 };


            List<int> result = new List<int>();
            var listener = new Progress<int>(
                taskResult => {
                    result.Add(taskResult);
                });

            var actual = await tasks.WhileAll(listener);
            Thread.Sleep(50); // wait a bit for progress reports to complete

            Assert.AreEqual(2, result.Count);
            Assert.IsTrue(result.Contains(101));
            Assert.IsTrue(result.Contains(102));

            Assert.AreEqual(2, actual.Count);
            Assert.IsTrue(actual.Contains(101));
            Assert.IsTrue(actual.Contains(102));
        }
    }

Again, nothing complicated :

  • I create an array of two dumb tasks
  • I define the listner which takes the task result and (for the unit test needs) adds it to the collection of results
  • I await the extension methods on task array with provided listner
  • Wait for 50 ms so the progress report would have time to finish (code is async after all)
  • Check that both tasks were reported to subscriber
  • Check that both tasks are returned as result.

Conclussion

That’s it. Dead simple code which I use A LOT to increase CPU utilization of my web crawlers.

Source code of the extension method and unit test can be found here.

Filed under: Development No Comments