Showing posts with label Rx. Show all posts
Showing posts with label Rx. Show all posts

Tuesday, September 20, 2011

When the Reactive Framework meets F# 3.0

Few days ago I have been writing about asynchronous sequences in C#. This time we will see how easy it is to implement the same web crawler using Reactive Framework and a new F# 3.0 feature called Query Expressions. The original implementation of web crawler created by Tomas Petricek can be found here and my implementation is surprisingly similar:

let rec randomCrawl url = 
let visited = new System.Collections.Generic.HashSet<_>()

let rec loop url = obs {
if visited.Add(url) then
let! doc = (downloadDocument url) |> fromAsync
match doc with
| Some doc ->
yield url, getTitle doc
for link in extractLinks doc do
yield! loop link
| _ -> () }
loop url

rxquery {
for (url, title) in randomCrawl "http://news.bing.com" do
where (url.Contains("bing.com") |> not)
select title
take 10 into gr
iter (printfn "%s" gr)
}
|> ObservableExtensions.Subscribe |> ignore

There are two interesting things inside the code above. The first one is “obs { … } ” code construction. This is custom implementation of Computation Expression builder. obs is just a normal variable of a type that contains a special set of methods like: Bind, Delay, For, Combine, … and so on. The F# compiler translates the code inside the curly brackets into code calling those methods. In my case the whole expression returns the implementation of IObservable<T> type. This allows us to write imperative code representing observable source where each “yield” call returns next value (OnNext of subscribed observer is being called) and the “let!” keyword causes the program to wait for the values from the other observable source (let! keyword works like await keyword in C#). See the implementation of the builder class below:

type ObsBuiler() = 
member this.Zero () = Observable.Empty(Scheduler.CurrentThread)
member this.Yield v = Observable.Return(v, Scheduler.CurrentThread)
member this.Delay (f: _ -> IObservable<_>) = Observable.Defer(fun _ -> f())
member this.Combine (o1,o2) = Observable.Concat(o1,o2)
member this.For (s:seq<_>, body : _ -> IObservable<_>) = Observable.Concat(s.Select(body))
member this.YieldFrom a : IObservable<_> = a
member this.Bind ((o : IObservable<_>),(f : _ -> IObservable<_>)) = o.SelectMany(f)
member this.TryFinally((o : IObservable<_>),f : unit -> unit ) = Observable.Finally(o,fun _ -> f())
member this.TryWith((o : IObservable<_>),f : Exception -> IObservable<_> ) = Observable.Catch(o,fun ex -> f(ex))
member this.While (p,body) = Observable.While((fun () -> p()), body)
member this.Using (dis,body) = Observable.Using( (fun () -> dis), fun d -> body(d))

let obs = new ObsBuiler()

The second interesting part is “rxquery { … }” code construction. This time we are using a feature called Query Expressions introduced in F# 3.0 which was released few day ago during Build conference together with Visual Studio 11 and Windows 8. We can write queries very similar to LINQ queries and the F# compiler translate them into code calling methods like: where, select, groupBy, take and on so on. So it works like LINQ queries in C# but here we can extend the set of available methods arbitrarily !!! Look at Zip and ForkJoin methods below which are not available by default with QueryBuilder implementation working with IEnumerable<T> type. Let’s see the implementation of query builder class:

type RxQueryBuiler() =  
member this.For (s:IObservable<_>, body : _ -> IObservable<_>) = s.SelectMany(body)
[<CustomOperation("select", AllowIntoPattern=true)>]
member this.Select (s:IObservable<_>, [<ProjectionParameter>] selector : _ -> _) = s.Select(selector)
[<CustomOperation("where", MaintainsVariableSpace=true, AllowIntoPattern=true)>]
member this.Where (s:IObservable<_>, [<ProjectionParameter>] predicate : _ -> bool ) = s.Where(predicate)
[<CustomOperation("takeWhile", MaintainsVariableSpace=true, AllowIntoPattern=true)>]
member this.TakeWhile (s:IObservable<_>, [<ProjectionParameter>] predicate : _ -> bool ) = s.TakeWhile(predicate)
[<CustomOperation("take", MaintainsVariableSpace=true, AllowIntoPattern=true)>]
member this.Take (s:IObservable<_>, count) = s.Take(count)
[<CustomOperation("skipWhile", MaintainsVariableSpace=true, AllowIntoPattern=true)>]
member this.SkipWhile (s:IObservable<_>, [<ProjectionParameter>] predicate : _ -> bool ) = s.SkipWhile(predicate)
[<CustomOperation("skip", MaintainsVariableSpace=true, AllowIntoPattern=true)>]
member this.Skip (s:IObservable<_>, count) = s.Skip(count)
member this.Zero () = Observable.Empty(Scheduler.CurrentThread)
member this.Yield (value) = Observable.Return(value)
[<CustomOperation("count")>]
member this.Count (s:IObservable<_>) = Observable.Count(s)
[<CustomOperation("all")>]
member this.All (s:IObservable<_>, [<ProjectionParameter>] predicate : _ -> bool ) = s.All(new Func<_,bool>(predicate))
[<CustomOperation("contains")>]
member this.Contains (s:IObservable<_>, key) = s.Contains(key)
[<CustomOperation("distinct", MaintainsVariableSpace=true, AllowIntoPattern=true)>]
member this.Distinct (s:IObservable<_>) = s.Distinct()
[<CustomOperation("exactlyOne")>]
member this.ExactlyOne (s:IObservable<_>) = s.Single()
[<CustomOperation("exactlyOneOrDefault")>]
member this.ExactlyOneOrDefault (s:IObservable<_>) = s.SingleOrDefault()
[<CustomOperation("find")>]
member this.Find (s:IObservable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.First(new Func<_,bool>(predicate))
[<CustomOperation("head")>]
member this.Head (s:IObservable<_>) = s.First()
[<CustomOperation("headOrDefault")>]
member this.HeadOrDefault (s:IObservable<_>) = s.FirstOrDefault()
[<CustomOperation("last")>]
member this.Last (s:IObservable<_>) = s.Last()
[<CustomOperation("lastOrDefault")>]
member this.LastOrDefault (s:IObservable<_>) = s.LastOrDefault()
[<CustomOperation("maxBy")>]
member this.MaxBy (s:IObservable<'a>, [<ProjectionParameter>] valueSelector : 'a -> 'b) = s.MaxBy(new Func<'a,'b>(valueSelector))
[<CustomOperation("minBy")>]
member this.MinBy (s:IObservable<'
a>, [<ProjectionParameter>] valueSelector : 'a -> 'b) = s.MinBy(new Func<'a,'b>(valueSelector))
[<CustomOperation("nth")>]
member this.Nth (s:IObservable<'a>, index ) = s.ElementAt(index)
[<CustomOperation("sumBy")>]
member inline this.SumBy (s:IObservable<_>,[<ProjectionParameter>] valueSelector : _ -> _) = s.Select(valueSelector).Aggregate(Unchecked.defaultof<_>, new Func<_,_,_>( fun a b -> a + b))
[<CustomOperation("groupBy", AllowIntoPattern=true)>]
member this.GroupBy (s:IObservable<_>,[<ProjectionParameter>] keySelector : _ -> _) = s.GroupBy(new Func<_,_>(keySelector))
[<CustomOperation("groupValBy", AllowIntoPattern=true)>]
member this.GroupValBy (s:IObservable<_>,[<ProjectionParameter>] resultSelector : _ -> _,[<ProjectionParameter>] keySelector : _ -> _) = s.GroupBy(new Func<_,_>(keySelector),new Func<_,_>(resultSelector))
[<CustomOperation("join", IsLikeJoin=true)>]
member this.Join (s1:IObservable<_>,s2:IObservable<_>, [<ProjectionParameter>] s1KeySelector : _ -> _,[<ProjectionParameter>] s2KeySelector : _ -> _,[<ProjectionParameter>] resultSelector : _ -> _) = s1.Join(s2,new Func<_,_>(s1KeySelector),new Func<_,_>(s2KeySelector),new Func<_,_,_>(resultSelector))
[<CustomOperation("groupJoin", AllowIntoPattern=true)>]
member this.GroupJoin (s1:IObservable<_>,s2:IObservable<_>, [<ProjectionParameter>] s1KeySelector : _ -> _,[<ProjectionParameter>] s2KeySelector : _ -> _,[<ProjectionParameter>] resultSelector : _ -> _) = s1.GroupJoin(s2,new Func<_,_>(s1KeySelector),new Func<_,_>(s2KeySelector),new Func<_,_,_>(resultSelector))
[<CustomOperation("zip", IsLikeZip=true)>]
member this.Zip (s1:IObservable<_>,s2:IObservable<_>,[<ProjectionParameter>] resultSelector : _ -> _) = s1.Zip(s2,new Func<_,_,_>(resultSelector))
[<CustomOperation("forkJoin", IsLikeZip=true)>]
member this.ForkJoin (s1:IObservable<_>,s2:IObservable<_>,[<ProjectionParameter>] resultSelector : _ -> _) = s1.ForkJoin(s2,new Func<_,_,_>(resultSelector))
[<CustomOperation("iter")>]
member this.Iter(s:IObservable<_>, [<ProjectionParameter>] selector : _ -> _) = s.Do(selector)

let rxquery = new RxQueryBuiler()
 

Sunday, September 4, 2011

Programming with C# asynchronous sequences

Tomas Petricek in his last blog post titled “Programming with F# asynchronous sequences” presents F# implementation of something called asynchronous sequences. In this post I will show you how the same concept can be implemented in C#. Let’s look at the sample code below to better understand what the asynchronous sequence is:

IEnumerable<...> AsyncSeq()
{
yield return "Hello";
await TaskEx.Delay(100);
yield return "world!";
}

Asynchronous sequences is a code that produces the sequence of values generated on demand (this is how the IEnumerable interface can be interpreted) but additionally does some asynchronous work during the evaluation process (await keyword). Every time the client of asynchronous sequence calls MoveNext method, next value is being evaluated. The key feature here is that the client decides when to produce next value and when to stop the processing.

There are two problems with such an implementation of asynchronous sequence. Sequences in .Net world are represented with IEnumerable interface, but the interface allows only synchronous processing. Since the MoveNext method returns bool value in the interface implementation, we need to immediately decide whether the next value can be produced or not. In the asynchronous processing it can take a few minutes or even hours to provide such information. The second problem is that so far we cannot mix together await keyword (Async Ctp) with yield return/yield break keywords inside the same method body. My solution resolves those two problems and the above sequence can be implemented the fallowing way:

IEnumerable<AsyncSeqItem<string>> AsyncSeq()
{
yield return "Hello";
yield return TaskEx.Delay(100);
yield return "world!";
}

public enum AsyncSeqItemMode
{
Value, Task, Sequence
}

public class AsyncSeqItem<T>
{
public AsyncSeqItemMode Mode { get; private set; }
public T Value { get; private set; }
public Task Task { get; private set; }
public IEnumerable<AsyncSeqItem<T>> Seq { get; private set; }

public AsyncSeqItem(T value)
{
Value = value;
Mode = AsyncSeqItemMode.Value;
}

public AsyncSeqItem(Task task)
{
Task = task;
Mode = AsyncSeqItemMode.Task;
}

public AsyncSeqItem(IEnumerable<AsyncSeqItem<T>> seq)
{
Seq = seq;
Mode = AsyncSeqItemMode.Sequence;
}

public static implicit operator AsyncSeqItem<T>(T value)
{
return new AsyncSeqItem<T>(value);
}

public static implicit operator AsyncSeqItem<T>(Task task)
{
return new AsyncSeqItem<T>(task);
}
}

AsyncSeqItem represents one of three following values:

  • Value – next value generated by the sequence
  • Task – some asynchronous work that needs to be awaited for before going forward
  • Sequence – it’s used with recursive calls and it means that we want to use tail recursion

There are two ways of consuming such sequence in the client:

public static class AsyncSeqExtensions
{
public static IEnumerable<Task<Option<T>>> ToTaskEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{ ... }

public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{ ... }
}

public class Option<T>
{
public T Value { get; private set; }
public bool HasValue { get; private set; }

public Option()
{
HasValue = false;
}

public Option(T value)
{
Value = value;
HasValue = true;
}

public static implicit operator Option<T>(T value)
{
return new Option<T>(value);
}
}

In the first approach we are calling ToAsyncEnumerable extension method returning the sequence of tasks. Each task wraps special type called Option<T> which can be used similarly to Nullable<T> type except that it works with value and reference types. Returning task with option object without the value means that we reached the end of the sequence. I also provide few standard LINQ operators built on the top of such a sequence semantic:

public static class AsyncSeqExtensions
{
async private static Task ForEachTaskImpl<T>(this IEnumerable<Task<Option<T>>> seq, Action<Task<Option<T>>> action)
{
foreach (var task in seq)
{
await task;
action(task);
}
}
public static Task ForEachTask<T>(this IEnumerable<Task<Option<T>>> seq, Action<Task<Option<T>>> action)
{
return ForEachTaskImpl(seq, action);
}

public static Task ForEach<T>(this IEnumerable<Task<Option<T>>> seq, Action<T> action)
{
return seq.ForEachTask(task =>
{
if(task.Result.HasValue)
action(task.Result.Value);
});
}

async private static Task<T[]> ToArrayImpl<T>(IEnumerable<Task<Option<T>>> seq)
{
var list = new List<T>();
await seq.ForEach(v => list.Add(v));
return list.ToArray();
}
public static Task<T[]> ToArray<T>(this IEnumerable<Task<Option<T>>> seq)
{
return ToArrayImpl(seq);
}

public static IEnumerable<Task<Option<TResult>>> Select<T, TResult>(this IEnumerable<Task<Option<T>>> source,
Func<T,TResult> selector) { ... }

public static IEnumerable<Task<Option<T>>> Where<T>(this IEnumerable<Task<Option<T>>> source,
Func<T, bool> predicate) { ... }

public static IEnumerable<Task<Option<T>>> Take<T>(this IEnumerable<Task<Option<T>>> source,
int count) { ... }

...
}

Returning additional task object at the end of a sequence with special value allows us to use standard IEnumerable<T> interface but it’s a little bit inconvenient. In the second approach we use the IAsyncEnumerable interface from the Reactive Framework library released some time ago.

public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetEnumerator();
}

public interface IAsyncEnumerator<out T> : IDisposable
{
Task<bool> MoveNext();
T Current { get; }
}

public static class AsyncEnumerable
{
public static IAsyncEnumerable<TResult> Select<TSource, TResult>(IAsyncEnumerable<TSource> source,
Func<TSource, TResult> selector) { ... }

public static IAsyncEnumerable<TSource> Where<TSource>(IAsyncEnumerable<TSource> source,
Func<TSource, bool> predicate) { ... }

public static IAsyncEnumerable<TSource> Take<TSource>(IAsyncEnumerable<TSource> source,
int n) { ... }
}

This interface perfectly represents the semantic of asynchronous sequence. Rx library also provides many standard LINQ operations like: Where, Select, Take, Sum, First and so on. This allows us to write almost any LINQ query executing on the top of asynchronous sequence.

Now let’s summarize what we achieved so far. We can write imperative code implementing asynchronous sequence. We can use extension method to create one of two asynchronous sequence representations. Finally we can iterate through all items in such a sequence or we can build a new sequence object using LINQ operators.

The C# version of the web crawler presented in Tomas Petricek’s blog post could look like this:

public static class AsyncSeqSample
{
async public static Task CrawlBingUsingAsyncEnumerable()
{
await RandomCrawl("http://news.bing.com")
.ToAsyncEnumerable()
.Where(t => !t.Item1.Contains("bing.com"))
.Select(t => t.Item2)
.Take(10)
.ForEach(Console.WriteLine);

Console.WriteLine("the end...");
}

async public static Task CrawlBingUsingTaskEnumerable()
{
await RandomCrawl("http://news.bing.com")
.ToTaskEnumerable()
.Where(t => !t.Item1.Contains("bing.com"))
.Select(t => t.Item2)
.Take(10)
.ForEach(Console.WriteLine);

Console.WriteLine("the end...");
}

public static IEnumerable<AsyncSeqItem<Tuple<string, string>>> RandomCrawl(string url)
{
return RandomCrawlLoop(url, new HashSet<string>());
}

private static IEnumerable<AsyncSeqItem<Tuple<string,string>>> RandomCrawlLoop(string url, HashSet<string> visited)
{
if (visited.Add(url))
{
var downloadTask = DownloadDocument(url);
yield return downloadTask;
if (downloadTask.Result.HasValue)
{
var doc = downloadTask.Result.Value;
yield return Tuple.Create(url, GetTitle(doc));
foreach (var link in ExtractLinks(doc))
{
foreach (var l in RandomCrawlLoop(link, visited))
{
yield return l;
}
}
}
}
}

private static string[] ExtractLinks(HtmlDocument doc)
{
try
{
var q = from a in doc.DocumentNode.SelectNodes("//a")
where a.Attributes.Contains("href")
let href = a.Attributes["href"].Value
where href.StartsWith("http://")
let endl = href.IndexOf('?')
select endl > 0 ? href.Substring(0, endl) : href;

return q.ToArray();
}
catch
{
return new string[0];
}
}

async private static Task<Option<HtmlDocument>> DownloadDocument(string url)
{
try
{
var client = new WebClient();
var html = await client.DownloadStringTaskAsync(url);
var doc = new HtmlDocument();
doc.LoadHtml(html);
return new Option<HtmlDocument>(doc);
}
catch (Exception)
{
return new Option<HtmlDocument>();
}
}

private static string GetTitle(HtmlDocument doc)
{
var title = doc.DocumentNode.SelectSingleNode("//title");
return title != null ? title.InnerText.Trim() : "Untitled";
}
}

Now let’s see how ToAsyncEnumerable and ToTaskEnumerable methods have been implemented:

public static class AsyncSeqExtensions
{
public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{
if (seq == null) throw new ArgumentNullException("seq");

return new AnonymousAsyncEnumerable<T>(() =>
{
var enumerator = seq.ToTaskEnumerable(continueOnCapturedContext).GetEnumerator();
seq = null; // holding reference to seq parameter introduces memory leaks when asynchronous sequence uses recursive calls
TaskCompletionSource<bool> currentTcs = null;
Task<Option<T>> currentTask = null;

return new AnonymousAsyncEnumerator<T>(
() =>
{
currentTcs = new TaskCompletionSource<bool>();

if (CheckEndOfSeq(currentTask) == false)
{
currentTcs.SetResult(false);
return currentTcs.Task;
}

enumerator.MoveNext();

enumerator.Current.ContinueWith(t =>
{
if (t.IsFaulted)
{
currentTcs.SetException(t.Exception);
}
else
{
if (!t.Result.HasValue)
{
currentTcs.SetResult(false);
}
else
{
currentTask = t;
currentTcs.SetResult(true);
}
}
});

return currentTcs.Task;
},
() => currentTask.Result.Value
);
});
}

public static IEnumerable<Task<Option<T>>> ToTaskEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{
if (seq == null) throw new ArgumentNullException("seq");

return new AnonymousEnumerable<Task<Option<T>>>(() =>
{
var synchronizationContext = continueOnCapturedContext ? SynchronizationContext.Current : null;

var enumerator = seq.GetEnumerator();
seq = null; // holding reference to seq parameter introduces memory leaks when asynchronous sequence uses recursive calls

TaskCompletionSource<Option<T>> currentTcs = null;

return new AnonymousEnumerator<Task<Option<T>>>(
() =>
{
if (CheckEndOfSeq(currentTcs) == false)
return false;

currentTcs = new TaskCompletionSource<Option<T>>();

Action moveNext = null;
moveNext = () =>
{
Start:

bool b;

try
{
b = enumerator.MoveNext();
}
catch (Exception exception)
{
currentTcs.SetException(exception);
return;
}

if (b == false)
{
currentTcs.SetResult(new Option<T>());
}
else
{
var c = enumerator.Current;
if (c.Mode == AsyncSeqItemMode.Value)
{
currentTcs.SetResult(c.Value);
}
else if (c.Mode == AsyncSeqItemMode.Task)
{
if (synchronizationContext != null)
c.Task.ContinueWith(_ => synchronizationContext.Post(s => ((Action)s)(), moveNext));
else
c.Task.ContinueWith(_ => moveNext());
}
else if (c.Mode == AsyncSeqItemMode.Sequence)
{
enumerator = c.Seq.GetEnumerator();
goto Start;
}
}
};

moveNext();

return true;
},
() => currentTcs.Task
);
});
}
}

As you can see the implementation is really simple but the whole concept of asynchronous sequence is very powerful.

Download

Friday, May 6, 2011

Rx projects update

In the past I have been writing about my projects related to Reactive Framework. Subsequent versions of Rx are published every one or two months and of course I have been updating my projects for my own purpose but they have not been available publicly. Last release of Rx (RC0) distinguishes two versions “Stable” and “Experimental” so I think we can expect that Rx API is really close to the final version. I thought that it is a great opportunity to publish updated versions of all my projects:

Basically the content of the first two projects stays the same but the RxSandbox has changed a little bit. Few new operators have been added and there are currently 69 total, additionally operators that are still in experimental state (not stable) are highlighted. I am considering moving RxSandbox application from WPF to Silverlight, that will simplify the process of deployment and the end users will be always up to date with the newest version of Rx. What do you think?

Download.

Friday, January 21, 2011

Async CTP on WP7

In this post I will show you how to use Async CTP released during last PDC conference inside your Windows Phone application. Async CTP extends two .Net platform languages C# and VB giving us a new way of writing asynchronous code. There are two main aspects of this project: new C#/VB compiler (two new keywords: async, await) and library AsyncCtpLibrary.dll. When the compiler sees asynchronous method it generates IL code (which we will see in details further) and that code is using some types from the mentioned library. The problem is that so far we have only two versions of library: .Net and Silverlight. So in this post I will show you how we can use  Async CTP on Windows Phone 7 despite these limitations.

The first thing we need to do before we write our first asynchronous method in the WP7 project is we need to add the reference to Silverlight version of the library AsyncCtpLibrary_Silverlight.dll. Just after doing it Visual Studio shows  a warning: “Adding a reference to a Silverlight assembly may lead to unexpected application behavior. Do you want to continue?”. The problem is that the assembly is compiled under full version of Silverlight and the Windows Phone does not contain the full version but some subset of it running on the top of .Net compact framework. So if we execute some code from such a assembly which is using types that are not available on the phone we will get the runtime exception. The key element of the AsyncCtpLibrary are Tasks and unfortunately they are not running on the phone throwing exception at runtime. The question is: can we use Async CTP without Tasks ? Of course we can! The whole mechanism behind asynchronous methods doesn’t force us to use the instance of type Task followed by await keyword. That type needs to contain method (instance or extension method) called GetAwaiter returning any type that contains appropriate pair of methods (instance or extension) BeginAwait and EndAwait. And all I did is I implemented such a matching type.

public enum AwaiterResultType
{
    Completed,
    Failed,
    Cancelled
}

public class AwaiterResult<T>
{
    public AwaiterResultType ResultType { get; private set; }
    public Exception Exception { get; private set; }
    public T Value { get; private set; }

    private AwaiterResult() { }
    public static AwaiterResult<T> Completed(T result)
    {
        return new AwaiterResult<T> { ResultType = AwaiterResultType.Completed, Value = result };
    }
    public static AwaiterResult<T> Failed(Exception exception)
    {
        return new AwaiterResult<T> { ResultType = AwaiterResultType.Failed, Exception = exception };
    }
    public static AwaiterResult<T> Cancelled()
    {
        return new AwaiterResult<T> { ResultType = AwaiterResultType.Cancelled };
    }
}

public class Awaiter<T>
{
    private SynchronizationContext Context { get; set; }
    private bool IsSynchronized { get; set; }
    private Action _continuation;

    public AwaiterResult<T> Result { get; private set; }

    private Awaiter() { }

    public Awaiter<T> GetAwaiter()
    {
        return this;
    }

    public bool BeginAwait(Action continuation)
    {
        if (Result != null)
            return false;

        _continuation += continuation;
        return true;
    }

    public T EndAwait()
    {
        if (Result == null)
            throw new InvalidOperationException("Awaiter does not contain the result yet.");

        if (Result.ResultType == AwaiterResultType.Completed)
            return Result.Value;

        if (Result.ResultType == AwaiterResultType.Failed)
            throw Result.Exception;

        return default(T); // if cancelled
    }

    private void OnResult(AwaiterResult<T> result)
    {
        if (result == null) throw new ArgumentNullException("result");
        if (Result != null) throw new InvalidOperationException("The result can be provided only once.");

        Result = result;
        if (IsSynchronized && Context != null)
        {
            Context.Post(o =>
                             {
                                 if (_continuation != null)
                                     _continuation();
                                 _continuation = null;
                             }, null);
            Context = null;
        }
        else
        {
            if (_continuation != null)
                _continuation();
            _continuation = null;
        }
    }

    public static Awaiter<T> Create(Action<Action<AwaiterResult<T>>> resultProvider, bool synchronizeWithCurrentContext = true)
    {
        if (resultProvider == null) throw new ArgumentNullException("resultProvider");

        var awaiter = new Awaiter<T> { IsSynchronized = synchronizeWithCurrentContext };
        if (synchronizeWithCurrentContext)
            awaiter.Context = SynchronizationContext.Current;
        resultProvider(awaiter.OnResult);
        return awaiter;
    }
}

The key class here is Awaiter class containing appropriate three methods: GetAwaiter, BeginAwait and EndAwait. This is an abstraction of asynchronous work (very similar to Task type) which at some point in time is finishing its work in one of three possible states: an exception could be thrown, maybe someone cancelled the execution or the work has been completed correctly returning some result. The constructor is private so the factory method called Create is the only way to create an instance of Awaiter type. So let’s see how we can use it:

public static class AwaiterEx
{
    public static Awaiter<T> Run<T>(Func<T> action)
    {
        return Awaiter<T>.Create(resultProvider =>
        {
            ThreadPool.QueueUserWorkItem(o =>
            {
                try
                {
                    var result = action();
                    resultProvider(AwaiterResult<T>.Completed(result));
                }
                catch (Exception exception)
                {
                    resultProvider(AwaiterResult<T>.Failed(exception));
                }
            },null);
        });         
    }

    public static Awaiter<object> Delay(TimeSpan timeSpan)
    {
        return Awaiter<object>.Create(resultProvider =>
        {
            var timer = new Timer(o => resultProvider(AwaiterResult<object>.Completed(null)));
            timer.Change((int)timeSpan.TotalMilliseconds, -1);
        });
    }
}

async private static void Sample()
{
    int intResult = await AwaiterEx.Run(() =>
    {
        Thread.Sleep(3000);
        return 123;
    });
    MessageBox.Show("Completed: " + intResult);

    await AwaiterEx.Delay(TimeSpan.FromSeconds(3));
    MessageBox.Show("After 3 seconds...");
}

Now let’s look at simplified version of what compiler is generating underneath to allow us to write synchronously looking code running asynchronously.

private static void SampleInternals()
{
    Awaiter<int> awaiter1 = null;
    Awaiter<object> awaiter2 = null;
    int state = 0;
    
    Action action = null;
    action = () =>
    {
        if (state == 1) goto JUMP_LABEL_1;
        if (state == 2) goto JUMP_LABEL_2;
        
        awaiter1 = AwaiterEx.Run(() =>
        {
            Thread.Sleep(3000);
            return 123;
        }).GetAwaiter();
        state = 1;
        
        if (awaiter1.BeginAwait(action))
            return;
        
        JUMP_LABEL_1:
        var intResult = awaiter1.EndAwait();
        MessageBox.Show("Completed: " + intResult);


        awaiter2 = AwaiterEx.Delay(TimeSpan.FromSeconds(3)).GetAwaiter();
        state = 2;

        if (awaiter2.BeginAwait(action))
            return;

        JUMP_LABEL_2:
        awaiter2.EndAwait();
        
        MessageBox.Show("After 3 seconds...");
    };

    action();         
}

The last thing worth mentioning is how to write asynchronous method returning some value. Asynchronous methods have two limitations: void, Task and Task<T> are the only valid return types and out parameters are not allowed. As we said before Tasks under WP7 don’t work so we cannot return Task object. In fact the solution is very simple:

public static class AwaiterEx
{
    public static Awaiter<object> BeginWriteAwaiter(this Stream stream, byte[] buffer, int offset, int numBytes)
    {
        return Awaiter<object>.Create(resultProvider =>
        {
            try
            {
                stream.BeginWrite(buffer, offset, numBytes, o =>
                {
                    try
                    {
                        stream.EndWrite(o);
                        resultProvider(AwaiterResult<object>.Completed(null));
                    }
                    catch (Exception exception)
                    {
                        resultProvider(AwaiterResult<object>.Failed(exception));
                    }
                }, null);
            }
            catch (Exception exception)
            {
                resultProvider(AwaiterResult<object>.Failed(exception));
            }
        });
    }
}

public Awaiter<object> WriteFileAwaiter(string path, string text)
{
    return Awaiter<object>.Create(async resultProvider =>
    {
        try
        {
            using (var stream = IsolatedStorageFile.OpenFile(path, FileMode.Create))  
                // caution: in fact the stream is running synchronously on WP7
            {
                var data = Encoding.Unicode.GetBytes(text);
                await stream.BeginWriteAwaiter(data, 0, data.Length);
                resultProvider(AwaiterResult<object>.Completed(null));
            }
        }
        catch (Exception exception)
        {
            resultProvider(AwaiterResult<object>.Failed(exception));
        }
    });
}

I encourage you to download sources with attached samples and play with Async CTP on WP7 because it changes a lot in terms of writing asynchronous code. AsyncCtpLibrary library provides TaskEx class with few very useful methods so you can find counterpart in my library called AwaiterEx with following API:

public static class AwaiterEx
{
    public static Awaiter<string> DownloadStringAwaiter(this WebClient webClient, Uri uri) { ... }
    public static Awaiter<object> BeginWriteAwaiter(this Stream stream, byte[] buffer, int offset, int numBytes) { ... }
  
    public static Awaiter<T> ToAwaiter<T>(this IObservable<T> observable) { ... }
    public static Awaiter<T[]> ToAwaiterAll<T>(this IObservable<T> observable) { ... }
    public static IObservable<T> ToObservable<T>(this Awaiter<T> awaiter) { ... }
    
    public static Awaiter<T[]> WhenAll<T>(this IEnumerable<Awaiter<T>> awaiters) { ... }
    public static Awaiter<T[]> WhenAll<T>(params Awaiter<T>[] awaiters) { ... }
    public static Awaiter<T> Run<T>(Func<T> action) { ... }
    public static Awaiter<object> Delay(TimeSpan timeSpan) { ... }
}

Have fun and let me know if you liked it or not. Download

Monday, October 25, 2010

Debugging Reactive Framework (RxDebugger) and Linq to objects (LinqDebugger)

[New version (2011.05.06)]

[Project download has been upgraded to the newest version of Rx (Build v1.0.2787.0) and VS2010 ] download
(Changes: projects converted to VS2010, sample project for RxDebugger added)

In this post I will present two projects LinqDebugger and RxDebugger. Few months ago after reading Bart De Smet’s great post about tracing execution of the Linq to objects queries I was wondering if it was be possible to implement the same concept but in more general way. If we want to trace the execution of all of the Linq operators using described approach we would have to implement many extension methods, one for each Linq operator. How to avoid this ? LinqDebugger is the the answer ;) . If you are wondering what the lazy evaluation is and how to debug Linq to object queries this project can be very useful. Let’s see a very simple query :

var q =
    from i in Enumerable.Range(0, 12)
    where i > 5 && i % 2 == 0
    select i.ToString("C");

foreach (var s in q)
    Console.WriteLine(s);

Now let’s debug that query:

var q =
    from i in Enumerable.Range(0, 12).AsDebuggable()
    where i > 5 && i % 2 == 0
    select i.ToString("C");

foreach (var s in q)
    Console.WriteLine(s); 

After running this code you will find the following text on the console:

Select creation
Select begin
Where creation
Where begin
 predicate i => ((i > 5) && ((i % 2) = 0)) : (0) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (1) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (2) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (3) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (4) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (5) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (6) => True
Where end 6
 selector i => i.ToString("C") : (6) => 6,00 zł
Select end 6,00 zł
6,00 zł
Select begin
Where begin
 predicate i => ((i > 5) && ((i % 2) = 0)) : (7) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (8) => True
Where end 8
 selector i => i.ToString("C") : (8) => 8,00 zł
Select end 8,00 zł
8,00 zł
Select begin
Where begin
 predicate i => ((i > 5) && ((i % 2) = 0)) : (9) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (10) => True
Where end 10
 selector i => i.ToString("C") : (10) => 10,00 zł
Select end 10,00 zł
10,00 zł
Select begin
Where begin
 predicate i => ((i > 5) && ((i % 2) = 0)) : (11) => False
Where end (no result)
Select end (no result)

This text shows how the query has been executed. There can find there information about enumerators object creation, about data passing from one enumerator to another and execution of all functions used inside the query. One thing worth mentioning is that everything is presented in the same order as it was executed. Having this information we can easily figure out for example in which iteration the exception has been thrown and what were the values processing by the query at that moment. If such text representation is hard to read for you I also provide the VS visualizer for ExecutionPlan type presenting the same information on the tree control (copy LinqDebugger.dll and LinqDebugger.Visualizer.dll files to C:\Program Files\Microsoft Visual Studio 9.0\Common7\Packages\Debugger\Visualizers folder to make it available).

var executionPlan = new ExecutionPlan();

var q =
    from i in Enumerable.Range(0, 12).AsDebuggable(executionPlan)
    where i > 5 && i % 2 == 0
    select i.ToString("C");

foreach (var s in q)
    Console.WriteLine(s);

image

I am not going to describe here in details how LinqDebugger has been implemented, you can download the code and check this out yourself. As a hint I will just show you the signature of AsDebuggable method. Please notice what else you can pass to that method. We can choose which operators we want to trace using LinqOperators enum type or pass TextWriter object (Console.Out is set as a default TextWriter).

public static class LinqDebuggerExtensions
{
    public static IQueryable<T> AsDebuggable<T>(this IEnumerable<T> source, ExecutionPlan executionPlan,
        LinqOperators linqOperators, TextWriter textWriter)
    { ... }
}

[Flags]
public enum LinqOperators : long
{
    None = 0,
    Aggregate = 1,
    All = 2,
    Any = 4,
    AsQueryable = 8,
    Average = 16,
    Cast = 32,
    Concat = 64,
    Contains = 128,
    ...
}

[Serializable]
public sealed class ExecutionPlan
{
    public Expression Query { get; internal set; }
    public List<Record> Records { get; }
    public void Reset();
}
    
[Serializable]
public class Record
{
    public RecordType RecordType { get; internal set; }
    public MethodCallExpression OperatorCallExpression { get; internal set; }
    public object Result { get; internal set; }
    public LambdaExpression FuncCallExpression { get; internal set; }
    public string FuncCallName { get; internal set; }
    public object[] Arguments { get; internal set; }
}

Tracing similar information during Rx queries execution is even more useful because in most cases such queries are executed asynchronously so debugging them is really hard. Let’s debug Rx version of previous query using RxDebugger:

var q =
    from i in Enumerable.Range(0, 12).ToObservable().AsDebuggable(
        new DebugSettings {SourceName = "range", Logger = DebugSettings.ConsoleLogger})
    where i > 5 && i % 2 == 0
    select i.ToString("C");

q.Run(Console.WriteLine);


range.Where.Select.Subscribe()
range.Where.Subscribe()
range.Subscribe()
range.OnNext(0)
range.OnNext(1)
range.OnNext(2)
range.OnNext(3)
range.OnNext(4)
range.OnNext(5)
6,00 zł
range.OnNext(6)
range.Where.OnNext(6)
range.Where.Select.OnNext(6,00 zł)
range.OnNext(7)
8,00 zł
range.OnNext(8)
range.Where.OnNext(8)
range.Where.Select.OnNext(8,00 zł)
range.OnNext(9)
10,00 zł
range.OnNext(10)
range.Where.OnNext(10)
range.Where.Select.OnNext(10,00 zł)
range.OnNext(11)
range.OnCompleted()
range.Where.OnCompleted()
range.Where.Select.OnCompleted()
range.Where.Select.Dispose()
range.Where.Dispose()
range.Dispose()

As you can see this time a quite deferent information displayed on the screen and there is no VS visualizer available. It’s because the implementation of RxDebugger is totally different from LinqDebugger. But there are some additional features too. To understand how RxDebugger works I will show you the Debug method which gives us ability to trace single observable object instead of the whole Rx query.

var q =
    from i in Enumerable.Range(0, 12).ToObservable().Debug(
        new DebugSettings {SourceName = "range", Logger = DebugSettings.ConsoleLogger})
    where i > 5 && i % 2 == 0
    select i.ToString("C");

q.Run(Console.WriteLine);

range.Subscribe()
range.OnNext(0)
range.OnNext(1)
range.OnNext(2)
range.OnNext(3)
range.OnNext(4)
range.OnNext(5)
6,00 zł
range.OnNext(6)
range.OnNext(7)
8,00 zł
range.OnNext(8)
range.OnNext(9)
10,00 zł
range.OnNext(10)
range.OnNext(11)
range.OnCompleted()
range.Dispose()

 

public static IObservable<T> Debug<T>(this IObservable<T> source, DebugSettings settings, Func<T, object> valueSelector)
{
    Action<T> onNext = v => { };
    if ((settings.NotificationFilter & NotificationFilter.OnNext) == NotificationFilter.OnNext)
        onNext = v => 
        { 
            if (settings.Logger != null) 
                settings.LoggerScheduler.Schedule(() => settings.Logger(DebugEntry.Create(settings, NotificationFilter.OnNext, valueSelector(v)))); 
        };
        
    Action<Exception> onError = ... ;
    Action onCompleted = ... ;
    Action subscribe = ... ;
    Action dispose = ... ;
    
    return Observable.CreateWithDisposable<T>(o =>
    {
        var newObserver = Observer.Create<T>
        (
            v => { onNext(v); o.OnNext(v); },
            e => { onError(e); o.OnError(e); },
            () => { onCompleted(); o.OnCompleted(); }
        );

        subscribe();
        var disposable = source.Subscribe(newObserver);

        return Disposable.Create(() =>
        {
            dispose();
            disposable.Dispose();
        });
    });
}

public sealed class DebugSettings
{
    // defaults 
    public static Action<DebugEntry> DefaultLogger { get; set; }
    public static IScheduler DefaultLoggerSchduler { get; set; }       
    public static string DefaultMessageFormat { get; set; }

    //loggers
    public static Action<DebugEntry> ConsoleLogger { get; private set; }
    public static Action<DebugEntry> DebugLogger { get; private set; }
   
    public string MessageFormat { get; set; }
    public Action<DebugEntry> Logger { get; set; }
    public IScheduler LoggerScheduler { get; set; }
    public string SourceName { get; set; }
    public NotificationFilter NotificationFilter { get; set; }
    public OperatorFilter OperatorFilter { get; set; }

    static DebugSettings()
    {
        ConsoleLogger = n => Console.WriteLine(n.FormattedMessage);
        DebugLogger = n => Debug.WriteLine(n.FormattedMessage);

        DefaultLogger = DebugLogger;
        DefaultLoggerSchduler = Scheduler.CurrentThread;
        DefaultMessageFormat = "{0}.{1}({2})";
    }

    public DebugSettings()
    {
        SourceName = "";
        NotificationFilter = NotificationFilter.All;
        OperatorFilter = OperatorFilter.AllOperators;

        Logger = DefaultLogger;
        LoggerScheduler = DefaultLoggerSchduler;
        MessageFormat = DefaultMessageFormat;
    }
}

public class DebugEntry
{
    public string SourceName { get; set; }
    public string FormattedMessage { get; set; }
    public NotificationFilter Kind { get; set; }
    public Exception Exception { get; set; }
    public object Value { get; set; }
}

Debug method creates a new observable object on the top of given observable sources. Each observer passed to this observable source is wrapped into a new observer tracing information about calling Subscribe, Dispose methods at the IObservable level and OnNext, OnError, OnCompleted methods at the IObserver level. We can provide filter on Rx operators (OperatorFilter enum type) or logged information (NotificationFilter enum type). In LinqDebugger project TextWiter class has been used to log information. Here we have much more flexible solution because we can pass delegate type responsible for storing logged information. RxDebugger provides standard loggers such as DebugSettings.ConsoleLogger or DebugSettings.DebugLogger but we can also set our own delegate type or even merge many different logger delegates. Such a scenario will be presented in further part of the post. Once we know how Debug method works let’s reveal the secret behind the AsDebuggable method.

public interface IDebuggedObservable<T> : IObservable<T>
{
    DebugSettings Settings { get; }
}
    
public static partial class RxDebuggerExtensions
{    
    public static IDebuggedObservable<T> AsDebuggable<T>(this IObservable<T> source, DebugSettings settings)
    {
        return new DebuggedObservable<T>(source.Debug(settings), settings);
    }
        
    public static IDebuggedObservable<TSource> Where<TSource>(this IDebuggedObservable<TSource> source , Func<TSource,bool> predicate)
    {
        var settings = source.Settings.Copy();
        settings.SourceName = settings.SourceName +  ".Where";
        return new DebuggedObservable<TSource>((source as IObservable<TSource>).Where<TSource>(predicate).Debug(settings), settings);
    }
    public static IDebuggedObservable<TResult> Select<TSource,TResult>(this IDebuggedObservable<TSource> source , Func<TSource,TResult> selector)
    {
        var settings = source.Settings.Copy();
        settings.SourceName = settings.SourceName +  ".Select";
        return new DebuggedObservable<TResult>((source as IObservable<TSource>).Select<TSource,TResult>(selector).Debug(settings), settings);
    } 
    ... 
    
    private class DebuggedObservable<T> : IDebuggedObservable<T>
    {
        private readonly IObservable<T> _source;
        private readonly DebugSettings _settings;
        public DebugSettings Settings { get { return _settings; } }

        public DebuggedObservable(IObservable<T> source, DebugSettings settings)
        {
            _source = source;
            _settings = settings;
        }
        public IDisposable Subscribe(IObserver<T> observer)
        {
            return _source.Subscribe(observer);
        }            
    }
}

Of course I didn’t implement extension methods for all Rx operator manually, I wrote T4 template generating appropriate extension methods (currently 127 methods in .Net version and 125 methods in Silverlight version :) ). The best way to use RxDebugger in your projects is to add T4 template to the project you are working on and run template every time you change the version of Rx. It allows you to always be synchronized with Rx dlls. Finally let’s see how to use RxDebugger in Silverlight application.

<UserControl x:Class="Blog.SL.Post016.RxDebuggerTest"
    xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation" 
    xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
    xmlns:d="http://schemas.microsoft.com/expression/blend/2008" 
    xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" 
    mc:Ignorable="d" d:DesignWidth="640" d:DesignHeight="480">
    <StackPanel>
        <TextBox x:Name="input"/>
        <TextBox x:Name="output"/>
        <ItemsControl x:Name="log"/>
    </StackPanel>
</UserControl>

 

public partial class RxDebuggerTest : UserControl
{
    public RxDebuggerTest()
    {
        InitializeComponent();

        var entries = new ObservableCollection<DebugEntry>();
        log.ItemsSource = entries;

        var q = input
            .GetObservableTextChanged()
            .Select(e => ((TextBox)e.Sender).Text)
            .AsDebuggable(new DebugSettings
                              {
                                  SourceName = "textChanged", 
                                  LoggerScheduler = Scheduler.Dispatcher, 
                                  Logger = DebugSettings.DebugLogger + entries.Add,                                      
                              })
            .Throttle(TimeSpan.FromSeconds(2))
            .Select(t => new string(t.Reverse().ToArray()));

        q.ObserveOnDispatcher().Subscribe(t => output.Text = t);
    }
}

image

And that’s it for the post. I encourage you to download and play with the two simple tools I provided here. They can be very helpful in debugging LINQ quires or learning about the internals of LINQ and Rx.
downlaod (Rx versions: .Net3.5 v1.0.2698.0 and SL3 v1.0.2698.0) Always check for newest version at the beginning of the post.