First encounters with Reactive Extensions

I’ve been researching Reactive Extensions for the last few days, with an eye to writing a short section in chapter 12 of the second edition of C# in Depth. (This is the most radically changed chapter from the first edition; it will be covering LINQ to SQL, IQueryable, LINQ to XML, Parallel LINQ, Reactive Extensions, and writing your own LINQ to Objects operators.) I’ve watched various videos from Channel 9, but today was the first time I actually played with it. I’m half excited, and half disappointed.

My excited half sees that there’s an awful lot to experiment with, and loads to learn about join patterns etc. I’m also looking forward to trying genuine events (mouse movements etc) – so far my tests have been to do with collections.

My disappointed half thinks it’s missing something. You see, Reactive Extensions shares some concepts with my own Push LINQ library… except it’s had smarter people (no offense meant to Marc Gravell) working harder on it for longer. I’d expect it to be easier to use, and make it a breeze to do anything you could do in Push LINQ. Unfortunately, that’s not quite the case.

Subscription model

First, the way that subscription is handled for collections seems slightly odd. I’ve been imagining two kinds of observable sources:

  • Genuine "event streams" which occur somewhat naturally – for instance, mouse movement events. Subscribing to such an observable wouldn’t do anything to it other than adding subscribers.
  • Collections (and the like) where the usual use case is "set up the data pipeline, then tell it to go". In that case calling Subscribe should just add the relevant observers, but not actually "start" the sequence – after all, you may want to add more observers (we’ll see an example of this in a minute).

In the latter case, I could imagine an extension method to IEnumerable<T> called ToObservable which would return a StartableObservable<T> or something like that – you’d subscribe what you want, and then call Start on the StartableObservable<T>. That’s not what appears to happen though – if you call ToObservable(), you get an implementation which iterates over the source sequence as soon as anything subscribes to it – which just doesn’t feel right to me. Admittedly it makes life easy in the case where that’s really all you want to do, but it’s a pain otherwise.

There’s a way of working round this in Reactive Extensions: there’s Subject<T> which is both an observer and an observable. You can create a Subject<T>, Subscribe all the observers you want (so as to set up the data pipeline) and then subscribe the subject to the real data source. It’s not exactly hard, but it took me a while to work out, and it feels a little unwieldy. The next issue was somewhat more problematic.

Blocking aggregation

When I first started thinking about Push LINQ, it was motivated by a scenario from the C# newsgroup: someone wanted to group a collection in a particular way, and then count how many items were in each group. This is effectively the "favourite colour voting" scenario outlined in the link at the top of this post. The problem to understand is that the normal Count() call is blocking: it fetches items from a collection until there aren’t any more; it’s in control of the execution flow, effectively. That means if you call it in a grouping construct, the whole group has to be available before you call Count(). So, you can’t stream an enormous data set, which is unfortunate.

In Push LINQ, I addressed this by making Count() return Future<int> instead of int. The whole query is evaluated, and then you can ask each future for its actual result. Unfortunately, that isn’t the approach that the Reactive Framework has taken – it still returns int from Count(). I don’t know the reason for this, but fortunately it’s somewhat fixable. We can’t change Observable of course, but we can add our own future-based extensions:

public static class ObservableEx
{
    public static Task<TResult> FutureAggregate<TSource, TResult>
        (this IObservable<TSource> source,
        TResult seed, Func<TResult, TSource, TResult> aggregation)
    {
        TaskCompletionSource<TResult> result = new TaskCompletionSource<TResult>();
        TResult current = seed;
        source.Subscribe(value => current = aggregation(current, value),
            error => result.SetException(error),
            () => result.SetResult(current));
        return result.Task;
    }

    public static Task<int> FutureMax(this IObservable<int> source)
    {
        // TODO: Make this generic and throw exception on
        // empty sequence. Left as an exercise for the reader.
        return source.FutureAggregate(int.MinValue, Math.Max);
    }

    public static Task<int> FutureMin(this IObservable<int> source)
    {
        // TODO: Make this generic and throw exception on
        // empty sequence. Left as an exercise for the reader.
        return source.FutureAggregate(int.MaxValue, Math.Min);
    }

    public static Task<int> FutureCount<T>(this IObservable<T> source)
    {
        return source.FutureAggregate(0, (count, _) => count + 1);
    }
}

This uses Task<T> from Parallel Extensions, which gives us an interesting ability, as we’ll see in a moment. It’s all fairly straightforward – TaskCompletionSource<T> makes it very easy to specify a value when we’ve finished, or indicate that an error occurred. As mentioned in the comments, the maximum/minimum implementations leave something to be desired, but it’s good enough for a blog post :)

Using the non-blocking aggregation operators

Now that we’ve got our extension methods, how can we use them? First I decided to do a demo which would count the number of lines in a file, and find the maximum and minimum line lengths:

public static List<T> ToList<T>(this IObservable<T> source)
{
    List<T> ret = new List<T>();
    source.Subscribe(x => ret.Add(x));
    return ret;
}
private static IEnumerable<string> ReadLines(string filename)
{
    using (TextReader reader = File.OpenText(filename))
    {
        string line;
        while ((line = reader.ReadLine()) != null)
        {
            yield return line;
        }
    }
}

var subject = new Subject<string>();
var lengths = subject.Select(line => line.Length);
var min = lengths.FutureMin();
var max = lengths.FutureMax();
var count = lengths.FutureCount();
            
var source = ReadLines("../../Program.cs");
source.ToObservable(Scheduler.Now).Subscribe(subject);
Console.WriteLine("Count: {0}, Min: {1}, Max: {2}",
                  count.Result, min.Result, max.Result);

As you can see, we use the Result property of a task to find its eventual result – this call will block until the result is ready, however, so you do need to be careful about how you use it. Each line is only read from the file once, and pushed to all three observers, who carry their state around until the sequence is complete, whereupon they publish the result to the task.

I got this working fairly quickly – then went back to the "grouping lines by line length" problem I’d originally set myself. I want to group the lines of a file by their length (all lines of length 0, all lines of length 1 etc) and count each group. The result is effectively a histogram of line lengths. Constructing the query itself wasn’t a problem – but iterating through the results was. Fundamentally, I don’t understand the details of ToEnumerable yet, particularly the timing. I need to look into it more deeply, but I’ve got two alternative solutions for the moment.

The first is to implement my own ToList extension method. This simply creates a list and subscribes an observer which adds items to the list as it goes. There’s no attempt at "safety" here – if you access the list before the source sequence has completed, you’ll see whatever has been added so far. I am still just experimenting :) Here’s the implementation:

public static List<T> ToList<T>(this IObservable<T> source)
{
    List<T> ret = new List<T>();
    source.Subscribe(x => ret.Add(x));
    return ret;
}

Now we can construct a query expression, project each group using our future count, make sure we’ve finished pushing the source before we read the results, and everything is fine:

var subject = new Subject<string>();
var groups = from line in subject
             group line.Length by line.Length into grouped
             select new { Length = grouped.Key, Count = grouped.FutureCount() };
var results = groups.ToList();

var source = ReadLines("../../Program.cs");
source.ToObservable(Scheduler.Now).Subscribe(subject);
foreach (var group in results)
{
    Console.WriteLine("Length: {0}; Count: {1}", group.Length, group.Count.Result);
}

Note how the call to ToList is required before calling source.ToObservable(...).Subscribe – otherwise everything would have been pushed before we started collecting it.

All well and good… but there’s another way of doing it too. We’ve only got a single task being produced for each group – instead of waiting until everything’s finished before we dump the results to the console, we can use Task.ContinueWith to write it (the individual group result) out as soon as that group has been told that it’s finished. We force this extra action to occur on the same thread as the observer just to make things easier in a console app… but it all works very neatly:

var subject = new Subject<string>();
var groups = from line in subject
             group line.Length by line.Length into grouped
             select new { Length = grouped.Key, Count = grouped.FutureCount() };
                                    
groups.Subscribe(group =>
{
    group.Count.ContinueWith(
         x => Console.WriteLine("Length: {0}; Count: {1}"
                                group.Length, x.Result),
         TaskContinuationOptions.ExecuteSynchronously);
});
var source = ReadLines("../../Program.cs");
source.ToObservable(Scheduler.Now).Subscribe(subject);

Conclusion

That’s the lot, so far. It feels like I’m sort of in the spirit of Reactive Extensions, but that maybe I’m pushing it (no pun intended) in a direction which Erik and Wes either didn’t anticipate, or at least don’t view as particularly valuable/elegant. I very much doubt that they didn’t consider deferred aggregates – it’s much more likely that either I’ve missed some easy way of doing this, or there are good reasons why it’s a bad idea. I hope to find out which at some point… but in the meantime, I really ought to work out a more idiomatic example for C# in Depth.

14 thoughts on “First encounters with Reactive Extensions”

  1. @Scott: I don’t think so. That’s still calling Count() “inline” – so it’s going to block until the sequence has finished, and then return the value on a different thread.

    If you change it so that it only computes the value on a different thread and *then* returns it, then it might fix it – but at the cost of one thread per group, which is also hideous (and potentially a cause of deadlock if there are enough groups).

    Essentially we want multiple counts increasing at the same time – that *either* means multiple threads, *or* it means a non-blocking Count() call. As Count() itself *is* blocking and we don’t want extra threads (IMO), that means we can’t use Count(). I expect I would find a problem with *any* solution using Count(). Either that, or we’d all discover that I’m a complete muppet. At this point, both outcomes seem equally likely.

    Like

  2. You’re right, it needs to be something like

    Observable.Defer( () => xs.Count())

    That said, if you’re willing to work with running values, Scan(), GroupBy(), and SelectMany() can do lots

    Like

  3. I think they got too caught up in the mathematical duality, and lost sight of the intention of a reactive framework in the process. The primary design goal should have been “providing a clean way of working with an event-based architecture”, which IMHO is different from “providing a cool library based on the mathematical dual of IEnumerable”.

    Like

  4. @Dax: I don’t think I’m in a position to say that yet. I’m still very excited about the whole thing, and there’s a *lot* to learn. It’ll be interesting to see how mainstream this becomes.

    Like

  5. It could go either way, so I’m looking forward to playing with it. I have to admit my initial (humourous) reaction was similar to Dax’s: Erik says it’s the exact mirror opposite of IEnumerable in every respect, and we know IEnumerable is very useful. So what’s the exact opposite of “very useful”? :)

    Like

  6. I have not looked at the Reactive Frame in any detail yet.

    However I would like to be able to databind (say a grid) to the result of a Linq to objects query. Then when an object (or collection) is updated in my data model, the grid should just automatically update its self.

    Say..

    ian.age = 40
    people.add(ian)
    oldPeople = from p in people where age > 40 select(p)
    grid.DataSource = oldPeople

    // the grid does not show me

    ian.age = 45

    // the grid now shows me

    I don’t know if the Reactive Frame aims to allow me to do this sort of thing.

    ——–

    The mouse event processing examples with the Reactive Frame just look too “odd” to be useful in the real world. I don’t think normal programmers (including me) are likely to wish to think that way. However only time will tell.

    Like

  7. @Ian: I wouldn’t expect the Reactive Framework to help in that situation.

    As for “normal” programmers not wanting to think in terms of events and asynchronous processing – I suspect it will take a long time, but happen eventually.

    Like

  8. Hi Jon,

    I’ve not played with Rx hands-on yet but I’ve watched a lot of the Channel 9 videos on teh topic. Regarding your observations on the Subscription model: Have you looked at Observable.Let()?

    The description for Observable.Let() says “Bind the source to the parameter so that it can be used multiple times without duplication of subscription side-effects.” which suggests it might solve your problem of conveniently subscribing multiple listeners to the same “cold” observable, without generating multiple streams.

    Not sure I’m fully understanding this yet, though, so I may be mistaken.

    Daniel

    Like

  9. @Daniel: I don’t have enough experience with Rx to say whether Let would work or not… but I’ll be writing a new post tonight as Wes and Erik have mailed me about this one.

    Like

  10. Interesting thoughts Jon,
    I was moderately underwhelmed by Rx after watching the channel9 videos. Lots of content on the maths.
    *HOWEVER* I am now on a project that is actively using it with Silverlight 3. Rx is bloody brilliant. It solves so many problems for us in a simple manner. We ‘could’ have written the code to do it before but this provides a great single API for our team to embrace. I think Rx is going to be just part of the way we work just as Generics has become.

    w.r.t examples for your upcoming book (loved the last version) maybe you could look at how Rx helps with concurrent WPF/WinFroms/SL code, or how we can throttle/buffer streaming data (stock ticks), or how we can build memoize functinoality in a few lines of code, or the Drag drop example from C9 in silverlight. All of these examples are solved so simply with Rx where as before it was a bit of a pain in the backside.
    my 2c

    Lee

    Like

Leave a comment