Showing posts with label Linq. Show all posts
Showing posts with label Linq. Show all posts

Friday, February 10, 2012

RIA Services (and Raven DB) presentation

This presentation is an introduction to RIA Services framework. Most tutorials show how to integrate RIA Services with relational database using DAL frameworks such as LINQ to SQL, LINQ to Entities or NHibernate. This presentation shows how to integrate RIA Services with document database Raven DB. Source code and slides can be found here.

Friday, December 23, 2011

LINQ presentation

Today I gave my old (from 2009) presentation about the LINQ. The video can be found here, slides and all code samples presented during this talk can be downloaded from here. I hope you find it useful.

Sunday, September 4, 2011

Programming with C# asynchronous sequences

Tomas Petricek in his last blog post titled “Programming with F# asynchronous sequences” presents F# implementation of something called asynchronous sequences. In this post I will show you how the same concept can be implemented in C#. Let’s look at the sample code below to better understand what the asynchronous sequence is:

IEnumerable<...> AsyncSeq()
{
yield return "Hello";
await TaskEx.Delay(100);
yield return "world!";
}

Asynchronous sequences is a code that produces the sequence of values generated on demand (this is how the IEnumerable interface can be interpreted) but additionally does some asynchronous work during the evaluation process (await keyword). Every time the client of asynchronous sequence calls MoveNext method, next value is being evaluated. The key feature here is that the client decides when to produce next value and when to stop the processing.

There are two problems with such an implementation of asynchronous sequence. Sequences in .Net world are represented with IEnumerable interface, but the interface allows only synchronous processing. Since the MoveNext method returns bool value in the interface implementation, we need to immediately decide whether the next value can be produced or not. In the asynchronous processing it can take a few minutes or even hours to provide such information. The second problem is that so far we cannot mix together await keyword (Async Ctp) with yield return/yield break keywords inside the same method body. My solution resolves those two problems and the above sequence can be implemented the fallowing way:

IEnumerable<AsyncSeqItem<string>> AsyncSeq()
{
yield return "Hello";
yield return TaskEx.Delay(100);
yield return "world!";
}

public enum AsyncSeqItemMode
{
Value, Task, Sequence
}

public class AsyncSeqItem<T>
{
public AsyncSeqItemMode Mode { get; private set; }
public T Value { get; private set; }
public Task Task { get; private set; }
public IEnumerable<AsyncSeqItem<T>> Seq { get; private set; }

public AsyncSeqItem(T value)
{
Value = value;
Mode = AsyncSeqItemMode.Value;
}

public AsyncSeqItem(Task task)
{
Task = task;
Mode = AsyncSeqItemMode.Task;
}

public AsyncSeqItem(IEnumerable<AsyncSeqItem<T>> seq)
{
Seq = seq;
Mode = AsyncSeqItemMode.Sequence;
}

public static implicit operator AsyncSeqItem<T>(T value)
{
return new AsyncSeqItem<T>(value);
}

public static implicit operator AsyncSeqItem<T>(Task task)
{
return new AsyncSeqItem<T>(task);
}
}

AsyncSeqItem represents one of three following values:

  • Value – next value generated by the sequence
  • Task – some asynchronous work that needs to be awaited for before going forward
  • Sequence – it’s used with recursive calls and it means that we want to use tail recursion

There are two ways of consuming such sequence in the client:

public static class AsyncSeqExtensions
{
public static IEnumerable<Task<Option<T>>> ToTaskEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{ ... }

public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{ ... }
}

public class Option<T>
{
public T Value { get; private set; }
public bool HasValue { get; private set; }

public Option()
{
HasValue = false;
}

public Option(T value)
{
Value = value;
HasValue = true;
}

public static implicit operator Option<T>(T value)
{
return new Option<T>(value);
}
}

In the first approach we are calling ToAsyncEnumerable extension method returning the sequence of tasks. Each task wraps special type called Option<T> which can be used similarly to Nullable<T> type except that it works with value and reference types. Returning task with option object without the value means that we reached the end of the sequence. I also provide few standard LINQ operators built on the top of such a sequence semantic:

public static class AsyncSeqExtensions
{
async private static Task ForEachTaskImpl<T>(this IEnumerable<Task<Option<T>>> seq, Action<Task<Option<T>>> action)
{
foreach (var task in seq)
{
await task;
action(task);
}
}
public static Task ForEachTask<T>(this IEnumerable<Task<Option<T>>> seq, Action<Task<Option<T>>> action)
{
return ForEachTaskImpl(seq, action);
}

public static Task ForEach<T>(this IEnumerable<Task<Option<T>>> seq, Action<T> action)
{
return seq.ForEachTask(task =>
{
if(task.Result.HasValue)
action(task.Result.Value);
});
}

async private static Task<T[]> ToArrayImpl<T>(IEnumerable<Task<Option<T>>> seq)
{
var list = new List<T>();
await seq.ForEach(v => list.Add(v));
return list.ToArray();
}
public static Task<T[]> ToArray<T>(this IEnumerable<Task<Option<T>>> seq)
{
return ToArrayImpl(seq);
}

public static IEnumerable<Task<Option<TResult>>> Select<T, TResult>(this IEnumerable<Task<Option<T>>> source,
Func<T,TResult> selector) { ... }

public static IEnumerable<Task<Option<T>>> Where<T>(this IEnumerable<Task<Option<T>>> source,
Func<T, bool> predicate) { ... }

public static IEnumerable<Task<Option<T>>> Take<T>(this IEnumerable<Task<Option<T>>> source,
int count) { ... }

...
}

Returning additional task object at the end of a sequence with special value allows us to use standard IEnumerable<T> interface but it’s a little bit inconvenient. In the second approach we use the IAsyncEnumerable interface from the Reactive Framework library released some time ago.

public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetEnumerator();
}

public interface IAsyncEnumerator<out T> : IDisposable
{
Task<bool> MoveNext();
T Current { get; }
}

public static class AsyncEnumerable
{
public static IAsyncEnumerable<TResult> Select<TSource, TResult>(IAsyncEnumerable<TSource> source,
Func<TSource, TResult> selector) { ... }

public static IAsyncEnumerable<TSource> Where<TSource>(IAsyncEnumerable<TSource> source,
Func<TSource, bool> predicate) { ... }

public static IAsyncEnumerable<TSource> Take<TSource>(IAsyncEnumerable<TSource> source,
int n) { ... }
}

This interface perfectly represents the semantic of asynchronous sequence. Rx library also provides many standard LINQ operations like: Where, Select, Take, Sum, First and so on. This allows us to write almost any LINQ query executing on the top of asynchronous sequence.

Now let’s summarize what we achieved so far. We can write imperative code implementing asynchronous sequence. We can use extension method to create one of two asynchronous sequence representations. Finally we can iterate through all items in such a sequence or we can build a new sequence object using LINQ operators.

The C# version of the web crawler presented in Tomas Petricek’s blog post could look like this:

public static class AsyncSeqSample
{
async public static Task CrawlBingUsingAsyncEnumerable()
{
await RandomCrawl("http://news.bing.com")
.ToAsyncEnumerable()
.Where(t => !t.Item1.Contains("bing.com"))
.Select(t => t.Item2)
.Take(10)
.ForEach(Console.WriteLine);

Console.WriteLine("the end...");
}

async public static Task CrawlBingUsingTaskEnumerable()
{
await RandomCrawl("http://news.bing.com")
.ToTaskEnumerable()
.Where(t => !t.Item1.Contains("bing.com"))
.Select(t => t.Item2)
.Take(10)
.ForEach(Console.WriteLine);

Console.WriteLine("the end...");
}

public static IEnumerable<AsyncSeqItem<Tuple<string, string>>> RandomCrawl(string url)
{
return RandomCrawlLoop(url, new HashSet<string>());
}

private static IEnumerable<AsyncSeqItem<Tuple<string,string>>> RandomCrawlLoop(string url, HashSet<string> visited)
{
if (visited.Add(url))
{
var downloadTask = DownloadDocument(url);
yield return downloadTask;
if (downloadTask.Result.HasValue)
{
var doc = downloadTask.Result.Value;
yield return Tuple.Create(url, GetTitle(doc));
foreach (var link in ExtractLinks(doc))
{
foreach (var l in RandomCrawlLoop(link, visited))
{
yield return l;
}
}
}
}
}

private static string[] ExtractLinks(HtmlDocument doc)
{
try
{
var q = from a in doc.DocumentNode.SelectNodes("//a")
where a.Attributes.Contains("href")
let href = a.Attributes["href"].Value
where href.StartsWith("http://")
let endl = href.IndexOf('?')
select endl > 0 ? href.Substring(0, endl) : href;

return q.ToArray();
}
catch
{
return new string[0];
}
}

async private static Task<Option<HtmlDocument>> DownloadDocument(string url)
{
try
{
var client = new WebClient();
var html = await client.DownloadStringTaskAsync(url);
var doc = new HtmlDocument();
doc.LoadHtml(html);
return new Option<HtmlDocument>(doc);
}
catch (Exception)
{
return new Option<HtmlDocument>();
}
}

private static string GetTitle(HtmlDocument doc)
{
var title = doc.DocumentNode.SelectSingleNode("//title");
return title != null ? title.InnerText.Trim() : "Untitled";
}
}

Now let’s see how ToAsyncEnumerable and ToTaskEnumerable methods have been implemented:

public static class AsyncSeqExtensions
{
public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{
if (seq == null) throw new ArgumentNullException("seq");

return new AnonymousAsyncEnumerable<T>(() =>
{
var enumerator = seq.ToTaskEnumerable(continueOnCapturedContext).GetEnumerator();
seq = null; // holding reference to seq parameter introduces memory leaks when asynchronous sequence uses recursive calls
TaskCompletionSource<bool> currentTcs = null;
Task<Option<T>> currentTask = null;

return new AnonymousAsyncEnumerator<T>(
() =>
{
currentTcs = new TaskCompletionSource<bool>();

if (CheckEndOfSeq(currentTask) == false)
{
currentTcs.SetResult(false);
return currentTcs.Task;
}

enumerator.MoveNext();

enumerator.Current.ContinueWith(t =>
{
if (t.IsFaulted)
{
currentTcs.SetException(t.Exception);
}
else
{
if (!t.Result.HasValue)
{
currentTcs.SetResult(false);
}
else
{
currentTask = t;
currentTcs.SetResult(true);
}
}
});

return currentTcs.Task;
},
() => currentTask.Result.Value
);
});
}

public static IEnumerable<Task<Option<T>>> ToTaskEnumerable<T>(this IEnumerable<AsyncSeqItem<T>> seq, bool continueOnCapturedContext = true)
{
if (seq == null) throw new ArgumentNullException("seq");

return new AnonymousEnumerable<Task<Option<T>>>(() =>
{
var synchronizationContext = continueOnCapturedContext ? SynchronizationContext.Current : null;

var enumerator = seq.GetEnumerator();
seq = null; // holding reference to seq parameter introduces memory leaks when asynchronous sequence uses recursive calls

TaskCompletionSource<Option<T>> currentTcs = null;

return new AnonymousEnumerator<Task<Option<T>>>(
() =>
{
if (CheckEndOfSeq(currentTcs) == false)
return false;

currentTcs = new TaskCompletionSource<Option<T>>();

Action moveNext = null;
moveNext = () =>
{
Start:

bool b;

try
{
b = enumerator.MoveNext();
}
catch (Exception exception)
{
currentTcs.SetException(exception);
return;
}

if (b == false)
{
currentTcs.SetResult(new Option<T>());
}
else
{
var c = enumerator.Current;
if (c.Mode == AsyncSeqItemMode.Value)
{
currentTcs.SetResult(c.Value);
}
else if (c.Mode == AsyncSeqItemMode.Task)
{
if (synchronizationContext != null)
c.Task.ContinueWith(_ => synchronizationContext.Post(s => ((Action)s)(), moveNext));
else
c.Task.ContinueWith(_ => moveNext());
}
else if (c.Mode == AsyncSeqItemMode.Sequence)
{
enumerator = c.Seq.GetEnumerator();
goto Start;
}
}
};

moveNext();

return true;
},
() => currentTcs.Task
);
});
}
}

As you can see the implementation is really simple but the whole concept of asynchronous sequence is very powerful.

Download

Monday, October 25, 2010

Debugging Reactive Framework (RxDebugger) and Linq to objects (LinqDebugger)

[New version (2011.05.06)]

[Project download has been upgraded to the newest version of Rx (Build v1.0.2787.0) and VS2010 ] download
(Changes: projects converted to VS2010, sample project for RxDebugger added)

In this post I will present two projects LinqDebugger and RxDebugger. Few months ago after reading Bart De Smet’s great post about tracing execution of the Linq to objects queries I was wondering if it was be possible to implement the same concept but in more general way. If we want to trace the execution of all of the Linq operators using described approach we would have to implement many extension methods, one for each Linq operator. How to avoid this ? LinqDebugger is the the answer ;) . If you are wondering what the lazy evaluation is and how to debug Linq to object queries this project can be very useful. Let’s see a very simple query :

var q =
    from i in Enumerable.Range(0, 12)
    where i > 5 && i % 2 == 0
    select i.ToString("C");

foreach (var s in q)
    Console.WriteLine(s);

Now let’s debug that query:

var q =
    from i in Enumerable.Range(0, 12).AsDebuggable()
    where i > 5 && i % 2 == 0
    select i.ToString("C");

foreach (var s in q)
    Console.WriteLine(s); 

After running this code you will find the following text on the console:

Select creation
Select begin
Where creation
Where begin
 predicate i => ((i > 5) && ((i % 2) = 0)) : (0) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (1) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (2) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (3) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (4) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (5) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (6) => True
Where end 6
 selector i => i.ToString("C") : (6) => 6,00 zł
Select end 6,00 zł
6,00 zł
Select begin
Where begin
 predicate i => ((i > 5) && ((i % 2) = 0)) : (7) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (8) => True
Where end 8
 selector i => i.ToString("C") : (8) => 8,00 zł
Select end 8,00 zł
8,00 zł
Select begin
Where begin
 predicate i => ((i > 5) && ((i % 2) = 0)) : (9) => False
 predicate i => ((i > 5) && ((i % 2) = 0)) : (10) => True
Where end 10
 selector i => i.ToString("C") : (10) => 10,00 zł
Select end 10,00 zł
10,00 zł
Select begin
Where begin
 predicate i => ((i > 5) && ((i % 2) = 0)) : (11) => False
Where end (no result)
Select end (no result)

This text shows how the query has been executed. There can find there information about enumerators object creation, about data passing from one enumerator to another and execution of all functions used inside the query. One thing worth mentioning is that everything is presented in the same order as it was executed. Having this information we can easily figure out for example in which iteration the exception has been thrown and what were the values processing by the query at that moment. If such text representation is hard to read for you I also provide the VS visualizer for ExecutionPlan type presenting the same information on the tree control (copy LinqDebugger.dll and LinqDebugger.Visualizer.dll files to C:\Program Files\Microsoft Visual Studio 9.0\Common7\Packages\Debugger\Visualizers folder to make it available).

var executionPlan = new ExecutionPlan();

var q =
    from i in Enumerable.Range(0, 12).AsDebuggable(executionPlan)
    where i > 5 && i % 2 == 0
    select i.ToString("C");

foreach (var s in q)
    Console.WriteLine(s);

image

I am not going to describe here in details how LinqDebugger has been implemented, you can download the code and check this out yourself. As a hint I will just show you the signature of AsDebuggable method. Please notice what else you can pass to that method. We can choose which operators we want to trace using LinqOperators enum type or pass TextWriter object (Console.Out is set as a default TextWriter).

public static class LinqDebuggerExtensions
{
    public static IQueryable<T> AsDebuggable<T>(this IEnumerable<T> source, ExecutionPlan executionPlan,
        LinqOperators linqOperators, TextWriter textWriter)
    { ... }
}

[Flags]
public enum LinqOperators : long
{
    None = 0,
    Aggregate = 1,
    All = 2,
    Any = 4,
    AsQueryable = 8,
    Average = 16,
    Cast = 32,
    Concat = 64,
    Contains = 128,
    ...
}

[Serializable]
public sealed class ExecutionPlan
{
    public Expression Query { get; internal set; }
    public List<Record> Records { get; }
    public void Reset();
}
    
[Serializable]
public class Record
{
    public RecordType RecordType { get; internal set; }
    public MethodCallExpression OperatorCallExpression { get; internal set; }
    public object Result { get; internal set; }
    public LambdaExpression FuncCallExpression { get; internal set; }
    public string FuncCallName { get; internal set; }
    public object[] Arguments { get; internal set; }
}

Tracing similar information during Rx queries execution is even more useful because in most cases such queries are executed asynchronously so debugging them is really hard. Let’s debug Rx version of previous query using RxDebugger:

var q =
    from i in Enumerable.Range(0, 12).ToObservable().AsDebuggable(
        new DebugSettings {SourceName = "range", Logger = DebugSettings.ConsoleLogger})
    where i > 5 && i % 2 == 0
    select i.ToString("C");

q.Run(Console.WriteLine);


range.Where.Select.Subscribe()
range.Where.Subscribe()
range.Subscribe()
range.OnNext(0)
range.OnNext(1)
range.OnNext(2)
range.OnNext(3)
range.OnNext(4)
range.OnNext(5)
6,00 zł
range.OnNext(6)
range.Where.OnNext(6)
range.Where.Select.OnNext(6,00 zł)
range.OnNext(7)
8,00 zł
range.OnNext(8)
range.Where.OnNext(8)
range.Where.Select.OnNext(8,00 zł)
range.OnNext(9)
10,00 zł
range.OnNext(10)
range.Where.OnNext(10)
range.Where.Select.OnNext(10,00 zł)
range.OnNext(11)
range.OnCompleted()
range.Where.OnCompleted()
range.Where.Select.OnCompleted()
range.Where.Select.Dispose()
range.Where.Dispose()
range.Dispose()

As you can see this time a quite deferent information displayed on the screen and there is no VS visualizer available. It’s because the implementation of RxDebugger is totally different from LinqDebugger. But there are some additional features too. To understand how RxDebugger works I will show you the Debug method which gives us ability to trace single observable object instead of the whole Rx query.

var q =
    from i in Enumerable.Range(0, 12).ToObservable().Debug(
        new DebugSettings {SourceName = "range", Logger = DebugSettings.ConsoleLogger})
    where i > 5 && i % 2 == 0
    select i.ToString("C");

q.Run(Console.WriteLine);

range.Subscribe()
range.OnNext(0)
range.OnNext(1)
range.OnNext(2)
range.OnNext(3)
range.OnNext(4)
range.OnNext(5)
6,00 zł
range.OnNext(6)
range.OnNext(7)
8,00 zł
range.OnNext(8)
range.OnNext(9)
10,00 zł
range.OnNext(10)
range.OnNext(11)
range.OnCompleted()
range.Dispose()

 

public static IObservable<T> Debug<T>(this IObservable<T> source, DebugSettings settings, Func<T, object> valueSelector)
{
    Action<T> onNext = v => { };
    if ((settings.NotificationFilter & NotificationFilter.OnNext) == NotificationFilter.OnNext)
        onNext = v => 
        { 
            if (settings.Logger != null) 
                settings.LoggerScheduler.Schedule(() => settings.Logger(DebugEntry.Create(settings, NotificationFilter.OnNext, valueSelector(v)))); 
        };
        
    Action<Exception> onError = ... ;
    Action onCompleted = ... ;
    Action subscribe = ... ;
    Action dispose = ... ;
    
    return Observable.CreateWithDisposable<T>(o =>
    {
        var newObserver = Observer.Create<T>
        (
            v => { onNext(v); o.OnNext(v); },
            e => { onError(e); o.OnError(e); },
            () => { onCompleted(); o.OnCompleted(); }
        );

        subscribe();
        var disposable = source.Subscribe(newObserver);

        return Disposable.Create(() =>
        {
            dispose();
            disposable.Dispose();
        });
    });
}

public sealed class DebugSettings
{
    // defaults 
    public static Action<DebugEntry> DefaultLogger { get; set; }
    public static IScheduler DefaultLoggerSchduler { get; set; }       
    public static string DefaultMessageFormat { get; set; }

    //loggers
    public static Action<DebugEntry> ConsoleLogger { get; private set; }
    public static Action<DebugEntry> DebugLogger { get; private set; }
   
    public string MessageFormat { get; set; }
    public Action<DebugEntry> Logger { get; set; }
    public IScheduler LoggerScheduler { get; set; }
    public string SourceName { get; set; }
    public NotificationFilter NotificationFilter { get; set; }
    public OperatorFilter OperatorFilter { get; set; }

    static DebugSettings()
    {
        ConsoleLogger = n => Console.WriteLine(n.FormattedMessage);
        DebugLogger = n => Debug.WriteLine(n.FormattedMessage);

        DefaultLogger = DebugLogger;
        DefaultLoggerSchduler = Scheduler.CurrentThread;
        DefaultMessageFormat = "{0}.{1}({2})";
    }

    public DebugSettings()
    {
        SourceName = "";
        NotificationFilter = NotificationFilter.All;
        OperatorFilter = OperatorFilter.AllOperators;

        Logger = DefaultLogger;
        LoggerScheduler = DefaultLoggerSchduler;
        MessageFormat = DefaultMessageFormat;
    }
}

public class DebugEntry
{
    public string SourceName { get; set; }
    public string FormattedMessage { get; set; }
    public NotificationFilter Kind { get; set; }
    public Exception Exception { get; set; }
    public object Value { get; set; }
}

Debug method creates a new observable object on the top of given observable sources. Each observer passed to this observable source is wrapped into a new observer tracing information about calling Subscribe, Dispose methods at the IObservable level and OnNext, OnError, OnCompleted methods at the IObserver level. We can provide filter on Rx operators (OperatorFilter enum type) or logged information (NotificationFilter enum type). In LinqDebugger project TextWiter class has been used to log information. Here we have much more flexible solution because we can pass delegate type responsible for storing logged information. RxDebugger provides standard loggers such as DebugSettings.ConsoleLogger or DebugSettings.DebugLogger but we can also set our own delegate type or even merge many different logger delegates. Such a scenario will be presented in further part of the post. Once we know how Debug method works let’s reveal the secret behind the AsDebuggable method.

public interface IDebuggedObservable<T> : IObservable<T>
{
    DebugSettings Settings { get; }
}
    
public static partial class RxDebuggerExtensions
{    
    public static IDebuggedObservable<T> AsDebuggable<T>(this IObservable<T> source, DebugSettings settings)
    {
        return new DebuggedObservable<T>(source.Debug(settings), settings);
    }
        
    public static IDebuggedObservable<TSource> Where<TSource>(this IDebuggedObservable<TSource> source , Func<TSource,bool> predicate)
    {
        var settings = source.Settings.Copy();
        settings.SourceName = settings.SourceName +  ".Where";
        return new DebuggedObservable<TSource>((source as IObservable<TSource>).Where<TSource>(predicate).Debug(settings), settings);
    }
    public static IDebuggedObservable<TResult> Select<TSource,TResult>(this IDebuggedObservable<TSource> source , Func<TSource,TResult> selector)
    {
        var settings = source.Settings.Copy();
        settings.SourceName = settings.SourceName +  ".Select";
        return new DebuggedObservable<TResult>((source as IObservable<TSource>).Select<TSource,TResult>(selector).Debug(settings), settings);
    } 
    ... 
    
    private class DebuggedObservable<T> : IDebuggedObservable<T>
    {
        private readonly IObservable<T> _source;
        private readonly DebugSettings _settings;
        public DebugSettings Settings { get { return _settings; } }

        public DebuggedObservable(IObservable<T> source, DebugSettings settings)
        {
            _source = source;
            _settings = settings;
        }
        public IDisposable Subscribe(IObserver<T> observer)
        {
            return _source.Subscribe(observer);
        }            
    }
}

Of course I didn’t implement extension methods for all Rx operator manually, I wrote T4 template generating appropriate extension methods (currently 127 methods in .Net version and 125 methods in Silverlight version :) ). The best way to use RxDebugger in your projects is to add T4 template to the project you are working on and run template every time you change the version of Rx. It allows you to always be synchronized with Rx dlls. Finally let’s see how to use RxDebugger in Silverlight application.

<UserControl x:Class="Blog.SL.Post016.RxDebuggerTest"
    xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation" 
    xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
    xmlns:d="http://schemas.microsoft.com/expression/blend/2008" 
    xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" 
    mc:Ignorable="d" d:DesignWidth="640" d:DesignHeight="480">
    <StackPanel>
        <TextBox x:Name="input"/>
        <TextBox x:Name="output"/>
        <ItemsControl x:Name="log"/>
    </StackPanel>
</UserControl>

 

public partial class RxDebuggerTest : UserControl
{
    public RxDebuggerTest()
    {
        InitializeComponent();

        var entries = new ObservableCollection<DebugEntry>();
        log.ItemsSource = entries;

        var q = input
            .GetObservableTextChanged()
            .Select(e => ((TextBox)e.Sender).Text)
            .AsDebuggable(new DebugSettings
                              {
                                  SourceName = "textChanged", 
                                  LoggerScheduler = Scheduler.Dispatcher, 
                                  Logger = DebugSettings.DebugLogger + entries.Add,                                      
                              })
            .Throttle(TimeSpan.FromSeconds(2))
            .Select(t => new string(t.Reverse().ToArray()));

        q.ObserveOnDispatcher().Subscribe(t => output.Text = t);
    }
}

image

And that’s it for the post. I encourage you to download and play with the two simple tools I provided here. They can be very helpful in debugging LINQ quires or learning about the internals of LINQ and Rx.
downlaod (Rx versions: .Net3.5 v1.0.2698.0 and SL3 v1.0.2698.0) Always check for newest version at the beginning of the post.

Thursday, November 12, 2009

Linq to ICollectionView

Currently I'm working on the project written in Silverlight and I have encountered an interface called ICollectionView and its standard implementation PagedCollectionView. In short, this components allow us to create the view of collection of items with filtering, grouping and sorting functionality. The idea behind this interface is very similar to DataView/DataTable mechanism. DataTable is responsible for storing data and DataView is just an appropriately configured proxy (only filtering and sorting in this case) which can be bound to UI controls. Let's look at a very simple example:

public class Number
{
    public int Value { get; set; }
    public int Random { get; set; }

    public static Number[] GetAll()
    {
        var random = new Random();
        return
            (from n in Enumerable.Range(1,5)
             from m in Enumerable.Repeat(n, n)
             select new Number {Value = n, Random = random.Next(10)}).ToArray();            
    }
}

// Silverlight
public class LinqToICollectionView : UserControl
{
    public LinqToICollectionView()
    {
        Number[] numbers = Number.GetAll();

        Content = new StackPanel
        {
            Orientation = Orientation.Horizontal,
            Children =
            {
                new DataGrid { ItemsSource = new PagedCollectionView(numbers).SetConfiguration()},
            }
        };
    }
}

public static class Configurator
{
    public static ICollectionView SetConfiguration(this ICollectionView view)
    {
        // filtering
        view.Filter = (object o) => ((Number)o).Value < 5;
        // grouping
        view.GroupDescriptions.Add(new PropertyGroupDescription("Value"));
        // sorting
        view.SortDescriptions.Add(new SortDescription("Value", ListSortDirection.Descending));
        view.SortDescriptions.Add(new SortDescription("Random", ListSortDirection.Ascending));
        return view;
    }     
}

image

But wait a minute, we said ... filtering, ordering, grouping ? Let's use LINQ query to configure ICollectionView:

public static class Configurator
{
    public static ICollectionView SetConfigurationWithLinq(this ICollectionView view)
    {
        var q =
            from n in new View<Number>()
            where n.Value < 5
            orderby n.Value descending, n.Random
            group n by n.Value;
        q.Apply(view);
        return view;
    }        
}

The whole implementation consists of 3 simple classes: View<T>, OrderedView<T> and GroupedView<T>.

public class View<T>
{
    public IEnumerable<GroupDescription> GroupDescriptions { get { ... } }
    public IEnumerable<SortDescription> SortDescriptions { get { ... } }
    public Func<T,bool> Filter { get { ... } }
    
    public View<T> Where(Func<T, bool> func) { ... }
    public SortedView<T> OrderBy<T2>(Expression<Func<T, T2>> func) { ... }
    public SortedView<T> OrderByDescending<T2>(Expression<Func<T, T2>> func) { ... }
    public GroupedView<T> GroupBy<T2>(Expression<Func<T, T2>> func) { ... }
    
    public void Apply(ICollectionView collectionView) { ... }
}
public sealed class SortedView<T> : View<T>
{    
    public SortedView<T> ThenBy<T2>(Expression<Func<T, T2>> func) { ... }
    public SortedView<T> ThenByDescending<T2>(Expression<Func<T, T2>> func)  { ... }
}
public sealed class GroupedView<T> : View<T>
{
    public GroupedView<T> ThenBy<T2>(Expression<Func<T, T2>> func) { ... }
}

Methods for sorting and grouping which take an expression tree as a parameter analyze the tree looking for indicated members (fields or properties) and collect appropriate SortDescription and GroupDescription objects. Where method takes a delegate type which is combined via logical and operator with existing filter delegate set previously (in case when Where method is called many times). Of course the same mechanism work also in WPF.

Number[] numbers = Number.GetAll();

// WPF
new Window
{
    Content = new StackPanel
    {
        Orientation = Orientation.Horizontal,
        Children =
        {
            CreateListView(new CollectionViewSource { Source = numbers }.View.SetConfiguration()),
            CreateListView(new CollectionViewSource { Source = numbers }.View.SetConfigurationWithLinq())
        }
    }
}
.ShowDialog();


private static ListView CreateListView(ICollectionView view)
{
    return new ListView
    {
        GroupStyle = { GroupStyle.Default },
        View = new GridView
        {
            Columns =
            {
                new GridViewColumn { Header = "Value", DisplayMemberBinding = new Binding("Value") },
                new GridViewColumn { Header = "Random", DisplayMemberBinding = new Binding("Random") },
            }
        },
        ItemsSource = view
    };
}

At the end I'd like to mention one interesting thing. We only support filtering, sorting and grouping and don't support for instance projection, joining and so on. That's way this code should compile:

var v1 = // only filtering specified
    from n in new View<Number>() 
    where n.Value < 5 
    select n;

var v2 = // grouping (last grouping definition overrides previous ones)
    from n in new View<Number>()
    group n by n.Random into s 
    group s by s.Value;

but this will not:

var v3 = // joining is not supported
    from p in new View<Number>()
    join pp in new[] { 1, 2, 3, 4 } on p.Random equals pp
    select p;

var v4 = // projection is not supported
    from p in new View<Number>()
    where p.Value > 5
    select p.Random;

var v5 = // at least one filtering, grouping or sorting definition must be specified
    from p in new View<Number>()
    select p;

var v6 = // Numer type is the only valid type of grouped element
    from p in new View<Number>()
    group p.Random by p.Value;

As a homework I leave you a question: why it works this way ? :)

Sources

Sunday, August 30, 2009

Incremental find with Reactive Framework

This time we will implement "incremental find" using Reactive Framework. This is a very good example showing what the Rx is all about.

Lets say we have a Windows Forms application with a single TextBox. When the user stops typing, the application immediately sends the request to the remote web service to find all words that containt the text from the TextBox. We use Rx to handle two important issues:

  • how to find the moment when user has just stopped typing ?
  • how to ensure that the correct results will be displayed when calling web service is implemented as asynchronous operation (results for the most recent input should discard responses for all previous requests) ?

For the sake of simplicity we simulate calling a web service by a simple asynchronous operation returning results after 3 or 6 seconds. This allows us to easily check whether the correct results are displayed every time. Just type 'iobserv' then wait for about 2 seconds (user stopped writing) and append letter 'e'. After about 3 second the results for 'ibserve' text should be displayed and then never changed.

const string RxOverview =
@"
http://channel9.msdn.com/shows/Going+Deep/Expert-to-Expert-Brian-Beckman-and-Erik-Meijer-Inside-the-NET-Reactive-Framework-Rx/

Now, what is Rx?

The .NET Reactive Framework (Rx) is the mathematical dual of LINQ to Objects. It consists of a pair 
of interfaces IObserver / IObservable that represent push-based, or observable, collections, plus a 
library of extension methods that implement the LINQ Standard Query Operators and other useful 
stream transformation functions.
... 
Observable collections capture the essence of the well-known subject/observer design pattern, and 
are tremendously useful for dealing with event-based and asynchronous programming, i.e. AJAX-style 
applications. For example, here is the prototypical Dictionary Suggest written using LINQ query 
comprehensions over observable collections:

IObservable<Html> q = from fragment in textBox
               from definitions in Dictionary.Lookup(fragment, 10).Until(textBox)
               select definitions.FormatAsHtml();

q.Subscribe(suggestions => { div.InnerHtml = suggestions; })
";

var textBox = new TextBox();
var label = new Label
                {
                    Text = "results...",
                    Location = new Point(0, 40),
                    Size = new Size(300, 500),
                    BorderStyle = BorderStyle.FixedSingle,                                
                };
var form = new Form { Controls = { textBox, label } };

Func<string, IObservable<string[]>> search = (s) =>
{
    var subject = new Subject<string[]>();

    ThreadPool.QueueUserWorkItem((w) =>
    {
        Thread.Sleep(s.Length % 2 == 0 ? 3000 : 6000);

        var result = RxOverview.
            Split(new[] { " ", "\n", "\t", "\r" }, StringSplitOptions.RemoveEmptyEntries).
            Where(t => t.ToLower().Contains(s.ToLower())).
            ToArray();

        subject.OnNext(result);
        subject.OnCompleted();
    }, null);

    return subject;
};

IObservable<Event<EventArgs>> textChanged = textBox.GetObservableTextChanged();

var q =
    from e in textChanged
    let text = (e.Sender as TextBox).Text
    from x in Observable.Return(new Unit()).Delay(1000).Until(textChanged)  // first issue
    from results in search(text).Until(textChanged)                         // second issue
    select new { text, results };

var a1 = q.Send(SynchronizationContext.Current).Subscribe(r =>
{
    label.Text = string.Format(" Text: {0}\n Found: {1}\n Results:\n{2}",
        r.text,
        r.results.Length,
        string.Join("\n", r.results));
});


form.ShowDialog();

Now try to implement the same functionality without the Rx Framework.

Sources

"Drag and drop" with Reactive Framework

Just look how easily we can implement "drag and drop" functionality using Reactive Framework.

var form = new Form
{
    Controls =
       {
           new Label {Text = "label1", BorderStyle = BorderStyle.FixedSingle},                       
           new Button {Text = "button1"},
           new Label {Text = "label2", BorderStyle = BorderStyle.FixedSingle},
       }
};

Func<Control, IObservable<Event<MouseEventArgs>>> mouseDown = c => c.GetObservableMouseDown();
Func<Control, IObservable<Event<MouseEventArgs>>> mouseUp = c => c.GetObservableMouseUp();
Func<Control, IObservable<Event<MouseEventArgs>>> mouseMove = c => c.GetObservableMouseMove();

var q =
    from Control con in form.Controls
    select
    (
        from d in mouseDown(con)
        from u in mouseMove(con).Until(mouseUp(con))
        select u
    );

q.Merge().Subscribe(args =>
{
    var control = args.Sender as Control;
    control.Location = Point.Add(control.Location,
        new Size(args.EventArgs.X, args.EventArgs.Y));
});

form.ShowDialog();
Now all controls on the form can be moved from one place to another :)

drag_drop 
Sources.

Inside LINQ queries

This time we will dig a little bit deeper inside LINQ queries. We will see what C# 3.0 compiler is doing behind the scenes when it meets LINQ query in code and how we can use it. Lets look at simple LINQ query:

var q =
    from i in new[] {1, 2, 3, 4, 5, 6}
    where i < 5
    select i.ToString("C");

Compiler handles LINQ query in two phases. Firstly it translates the query into a chain of method calls, each operator in LINQ syntax like where, select, ... is translated respectively into the methods Where, Select, ... .

var q =
    new[] {1, 2, 3, 4, 5, 6}.
    Where(i => i < 5).
    Select(i => i.ToString("C"));

So operator where was translated into Where method and select into Select method. As you can see lambda expressions have been also generated and put as a methods' parameters. The second phase is a discovery phase. It's a phase of looking for the marching methods. In our sample the type "array of int" does not have any instance method named Where so compiler is analyzing suitable extensions methods. The closest extension method with matching parameters is defined in System.Linq.Enumerable class so this is the one that will be used. Where method is a generic method and in our case it returns IEnumerable<int> type. The implementation of Where method is quite simple and looks like this:

public static class Enumerable
{
    public static IEnumerable<TSource> Where<TSource>(this IEnumerable<TSource> source,
        Func<TSource, bool> predicate)
    {
        foreach (var i in source)
            if(predicate(i))
                yield return i;
    }        
}

Now the compiler is looking for Select method. Once again type IEnumerable<int> does not contain instance Select method but but an Enumerable class contains matching one. Finally, our query returns type IEnumerable<string> and the code builds succeeded because all necessary methods has been found. If you are interested how more complicated queries are translated and you are using ReSharper, just point your mouse over a LINQ query, press Alt + Enter and choose "Convert LINQ to methods chain" option. ReSharper will automatically generate method chain for you.

Now we can see that LINQ query is just a chain of method calls, nothing more. The real power of LINQ is the implementation of these methods. .Net Framework 3.5 gives us Enumerable which contains about 50 methods with many different overloads such as Where, Select, Join, GoupBy, OrderBy, etc. Almost all methods extend IEnumerable<T> type so they can be used with all types implementing this interface, almost each collection in .Net Framework implementing this interface, which applies to almost all collections in the .NET Framework. I truly encourage you to familiarize with those methods because they are really powerful and only few of them are used in simple from ... where ... select LINQ queries.

Once we know how LINQ queries are handled by C# compiler we are ready to do some tricky stuff.

Lets say we would like to use the second overload of the Where method from Enumerable class inside LINQ query. This overload allows us to use the index of the iterated item and the the implementation looks like this:

public static class Enumerable
{
    public static IEnumerable<TSource> Where<TSource>(this IEnumerable<TSource> source,
        Func<TSource, int, bool> predicate)
    {
        int index = 0;
        foreach (var i in source)
            if (predicate(i, index++))
                yield return i;
    }    
}

It means that we can write a condition based on the index of the items stored in queried sequence. Although we cannot extend syntax of the LINQ query but we can extend the IEnumerable<T> type with our own Where method implementation changing the second parameter's type.

static class EnhancedEnumerable
{
    public static IEnumerable<T> Where<T>(this IEnumerable<T> collection,
        Func<T, Func<int, bool>> superPredicate)
    {
        int index = 0;
        foreach (var item in collection)
            if (superPredicate(item).Invoke(index++))
                yield return item;
    }
}

Now we are able to write a query based on the index of an item.

var q =
    from i in new[] { 1, 2, 3, 4, 5, 6 }
    where index  => 
        {
            Console.WriteLine("{0}. {1}", index, i);
            return (index % 2 == 0) && (i < 5);
        }
    select i.ToString("C");

Another interesting thing we can do with LINQ query is that we can provide a set of objects describing some domain with appropriate "LINQ methods" called Where, Select, OrderBy and so on, so that we could write LINQ query over them. This approach give us very strong control over LINQ query syntax because just during compile time we can chose witch LINQ operators we support and which we don't. Lets look at the example.

Lets assume that we have a company and we have a simple application storing data about our employees. The data are stored in files on the disk, each employee is stored in separate file. Additionally files are grouped in folders by some common feature. For example employees working in particular department are placed in the same folder. Lets assume also that the main types describing our domain look like this:

Company

Lets start with a simple LINQ query:

Company company= new Company();
var q =
    from e in company
    select e.Name;

This code will not compile because compiler cannot find matching Select method so lets provide it.

public class Company
{
    public IEnumerable<T> Select<T>(Func<Employee, T> p) { ... }
}

Is it worth to mention that variable e in our query is an instance of Employee type because matched Select method takes an Employee type as a first parameter of lambda expression. Now lets provider support for where operator.

var q =
    from e in company
    where e.Salary > 100
    select e.Name;

public class Company
{
    public Employee Where(Func<Employee, bool> p)) 
}

public abstract class Employee<T>
{
    public IEnumerable<T2> Select<T2>(Func<T, T2> p) { ... }
}

public sealed class Employee : Employee<Employee> { }

As you can see this is a very powerful mechanism which gives us ability to provide only those operators that are supported in our "query language". We could say that it is an alternative mechanism to writing a custom LINQ provider when we want to integrate LINQ queries with our specific domain. LINQ provider is much more flexible during process of analyzing and execution of queries but this approach has one very important feature. We can choose which operators we provide and what parameters the particular operator can take. Our query is validated at compile time, which means that if we don't support for instance sort operator and some query contains that operator, compilation will fail with error "SortBy method can not be found". When writing custom LINQ provider we have to validate LINQ query at runtime ourselves. To make this clear see next queries.

var q =
    from Manager m in company
    select m.ManagedProjects;
    
// this query is translated into: q = company.Cast<Manager>().Select(m => m.ManagedProjects);

public class Company
{
    public T Cast<T>() where T : ICast { ... }
}

public interface ICast
{ }
    
public sealed class Developer : Employee<Developer>, ICast { }
public sealed class Manager : Employee<Manager>, ICast { }

We provide Cast operator for the Company type so we can use Developer or Manager type in from clause (if fact each type implementing ICast). Please notice that the type of variable m is Manager not Employee in our query. Next query will show even more interesting feature.

var q =
   from e in company
   where e.Salary > 100     
   where e.IsManager
   where e.ManagedProjects > 1
   select new { e.Name, e.ManagedProjects };
   
public sealed class Employee
{
    public Manager IsManager { get { return null; } }

    public Manager Where(Func<Employee, Manager> p) { ... }
}  

Here the type of variable e in the first part of the query is Employee, but after line "where e.IsManager" the type of e is Manager. As we saw we may provide support for any subset of LINQ operators but that's not all. We can also provide the particular operator only for specified type, for example "only managers can be sorted" or "we can group employees only by Department or Localization":

var q =
    from Manager m in company                  
    orderby m.ManagedProjects descending, m.Name
    select new { m.Name, m.ManagedProjects };

// q = company.Cast<Manager>().
//    OrderByDescending(m => m.ManagedProjects).ThenBy(m => m.Name).
//    Select(m => new {m.Name, m.ManagedProjects});

public sealed class Manager
{
    public Manager OrderBy<T>(Func<Manager, T> p) { ... }
    public Manager OrderByDescending<T>(Func<Manager, T> p) { ... }
    public Manager ThenBy<T>(Func<Manager, T> p) { ... }
    public Manager ThenByDescending<T>(Func<Manager, T> p) { ... }
}


var q =
    from Developer e in company
    group e by e.Location;
    
// q = company.Cast<Developer>().GroupBy(e => e.Location);

public abstract class Employee<T>    
{
    public Location Location { get; protected set; }
    public Department Department { get; protected set; }
    
    public IEnumerable<Group<Location,T>> GroupBy(Func<T, Location> p) { ... }
    public IEnumerable<Group<Department, T>> GroupBy(Func<T, Department> p) { ... }
}  

public class Group<TKey, T>
{
    public TKey Key { get; private set;}
    public IEnumerable<T> Items { get; private set;}
    
    internal Group(TKey key, IEnumerable<T> items)
    {
        Key = key;
        Items = items;
    }
}

Finally I'd like to show you how smart the C# 3.0 compiler is. When compiler meets a LINQ query with a simple select operator like this:

var q =
    from i in new[] {1, 2, 3, 4, 5, 6}
    where i < 5
    select i;

The small small optimization takes place. Because our query after translation looks like this:

var q =
    new[] {1, 2, 3, 4, 5, 6}.
    Where(i => i < 5).
    Select(i => i);

so probably calling Select method does not change anything (converting i into i) so this call is not generated and the final code looks like this:

var q =
    new[] {1, 2, 3, 4, 5, 6}.
    Where(i => i < 5);  

And of course in this particular case (LINQ to objects) it is true. But if we provide our own Select method returning DateTime type and if we place EnhancedEnumerable class "closer" to the above query (for instance in the same namespace) then our method will be used instead of standard LINQ operator (System.Linq.Enumerable.Where).

static class EnhancedEnumerable
{
    public static DateTime Select<TSource, TResult>(this IEnumerable<TSource> source,
        Func<TSource, TResult> predicate)
    {
        return DateTime.Now;
    }
}

In such case will the optimization take place or not ? Yes, it will. The optimization is performed in the first phase of query translation when we don't know yet which Select method will be used or even if any exits. There are more such optimizations so we should be careful when writing more advanced queries.

Treat this post as a prerequisite for next few posts where we will be talking more about LINQ internals.

The whole "queryable" solution with classes: Company, Employee, Developer and so on can be downloaded from here.