I’ve been researching Reactive Extensions for the last few days, with an eye to writing a short section in chapter 12 of the second edition of C# in Depth. (This is the most radically changed chapter from the first edition; it will be covering LINQ to SQL, IQueryable, LINQ to XML, Parallel LINQ, Reactive Extensions, and writing your own LINQ to Objects operators.) I’ve watched various videos from Channel 9, but today was the first time I actually played with it. I’m half excited, and half disappointed.
My excited half sees that there’s an awful lot to experiment with, and loads to learn about join patterns etc. I’m also looking forward to trying genuine events (mouse movements etc) – so far my tests have been to do with collections.
My disappointed half thinks it’s missing something. You see, Reactive Extensions shares some concepts with my own Push LINQ library… except it’s had smarter people (no offense meant to Marc Gravell) working harder on it for longer. I’d expect it to be easier to use, and make it a breeze to do anything you could do in Push LINQ. Unfortunately, that’s not quite the case.
Subscription model
First, the way that subscription is handled for collections seems slightly odd. I’ve been imagining two kinds of observable sources:
- Genuine "event streams" which occur somewhat naturally – for instance, mouse movement events. Subscribing to such an observable wouldn’t do anything to it other than adding subscribers.
- Collections (and the like) where the usual use case is "set up the data pipeline, then tell it to go". In that case calling Subscribe should just add the relevant observers, but not actually "start" the sequence – after all, you may want to add more observers (we’ll see an example of this in a minute).
In the latter case, I could imagine an extension method to IEnumerable<T> called ToObservable which would return a StartableObservable<T> or something like that – you’d subscribe what you want, and then call Start on the StartableObservable<T>. That’s not what appears to happen though – if you call ToObservable(), you get an implementation which iterates over the source sequence as soon as anything subscribes to it – which just doesn’t feel right to me. Admittedly it makes life easy in the case where that’s really all you want to do, but it’s a pain otherwise.
There’s a way of working round this in Reactive Extensions: there’s Subject<T> which is both an observer and an observable. You can create a Subject<T>, Subscribe all the observers you want (so as to set up the data pipeline) and then subscribe the subject to the real data source. It’s not exactly hard, but it took me a while to work out, and it feels a little unwieldy. The next issue was somewhat more problematic.
Blocking aggregation
When I first started thinking about Push LINQ, it was motivated by a scenario from the C# newsgroup: someone wanted to group a collection in a particular way, and then count how many items were in each group. This is effectively the "favourite colour voting" scenario outlined in the link at the top of this post. The problem to understand is that the normal Count() call is blocking: it fetches items from a collection until there aren’t any more; it’s in control of the execution flow, effectively. That means if you call it in a grouping construct, the whole group has to be available before you call Count(). So, you can’t stream an enormous data set, which is unfortunate.
In Push LINQ, I addressed this by making Count() return Future<int> instead of int. The whole query is evaluated, and then you can ask each future for its actual result. Unfortunately, that isn’t the approach that the Reactive Framework has taken – it still returns int from Count(). I don’t know the reason for this, but fortunately it’s somewhat fixable. We can’t change Observable of course, but we can add our own future-based extensions:
public static class ObservableEx
{
public static Task<TResult> FutureAggregate<TSource, TResult>
(
this IObservable<TSource> source,
TResult seed, Func<TResult, TSource, TResult> aggregation)
{
TaskCompletionSource<TResult> result =
new TaskCompletionSource<TResult>();
TResult current = seed;
source.Subscribe(value => current = aggregation(current, value),
error => result.SetException(error),
() => result.SetResult(current));
return result.Task;
}
public static Task<int> FutureMax(this IObservable<int> source)
{
return source.FutureAggregate(int.MinValue, Math.Max);
}
public static Task<int> FutureMin(this IObservable<int> source)
{
return source.FutureAggregate(int.MaxValue, Math.Min);
}
public static Task<int> FutureCount<T>(this IObservable<T> source)
{
return source.FutureAggregate(0, (count, _) => count + 1);
}
}
This uses Task<T> from Parallel Extensions, which gives us an interesting ability, as we’ll see in a moment. It’s all fairly straightforward – TaskCompletionSource<T> makes it very easy to specify a value when we’ve finished, or indicate that an error occurred. As mentioned in the comments, the maximum/minimum implementations leave something to be desired, but it’s good enough for a blog post :)
Using the non-blocking aggregation operators
Now that we’ve got our extension methods, how can we use them? First I decided to do a demo which would count the number of lines in a file, and find the maximum and minimum line lengths:
public static List<T> ToList<T>(this IObservable<T> source)
{
List<T> ret = new List<T>();
source.Subscribe(x => ret.Add(x));
return ret;
}
private static IEnumerable<string> ReadLines(string filename)
{
using (TextReader reader = File.OpenText(filename))
{
string line;
while ((line = reader.ReadLine()) != null)
{
yield return line;
}
}
}
…
var subject = new Subject<string>();
var lengths = subject.Select(line => line.Length);
var min = lengths.FutureMin();
var max = lengths.FutureMax();
var count = lengths.FutureCount();
var source = ReadLines("../../Program.cs");
source.ToObservable(Scheduler.Now).Subscribe(subject);
Console.WriteLine("Count: {0}, Min: {1}, Max: {2}",
count.Result, min.Result, max.Result);
As you can see, we use the Result property of a task to find its eventual result – this call will block until the result is ready, however, so you do need to be careful about how you use it. Each line is only read from the file once, and pushed to all three observers, who carry their state around until the sequence is complete, whereupon they publish the result to the task.
I got this working fairly quickly – then went back to the "grouping lines by line length" problem I’d originally set myself. I want to group the lines of a file by their length (all lines of length 0, all lines of length 1 etc) and count each group. The result is effectively a histogram of line lengths. Constructing the query itself wasn’t a problem – but iterating through the results was. Fundamentally, I don’t understand the details of ToEnumerable yet, particularly the timing. I need to look into it more deeply, but I’ve got two alternative solutions for the moment.
The first is to implement my own ToList extension method. This simply creates a list and subscribes an observer which adds items to the list as it goes. There’s no attempt at "safety" here – if you access the list before the source sequence has completed, you’ll see whatever has been added so far. I am still just experimenting :) Here’s the implementation:
public static List<T> ToList<T>(this IObservable<T> source)
{
List<T> ret = new List<T>();
source.Subscribe(x => ret.Add(x));
return ret;
}
Now we can construct a query expression, project each group using our future count, make sure we’ve finished pushing the source before we read the results, and everything is fine:
var subject =
new Subject<
string>();
var groups =
from line
in subject
group line.Length
by line.Length
into grouped
select new { Length = grouped.Key, Count = grouped.FutureCount() };
var results = groups.ToList();
var source = ReadLines("../../Program.cs");
source.ToObservable(Scheduler.Now).Subscribe(subject);
foreach (var group in results)
{
Console.WriteLine("Length: {0}; Count: {1}", group.Length, group.Count.Result);
}
Note how the call to ToList is required before calling source.ToObservable(...).Subscribe – otherwise everything would have been pushed before we started collecting it.
All well and good… but there’s another way of doing it too. We’ve only got a single task being produced for each group – instead of waiting until everything’s finished before we dump the results to the console, we can use Task.ContinueWith to write it (the individual group result) out as soon as that group has been told that it’s finished. We force this extra action to occur on the same thread as the observer just to make things easier in a console app… but it all works very neatly:
var subject = new Subject<string>();
var groups = from line in subject
group line.Length by line.Length into grouped
select new { Length = grouped.Key, Count = grouped.FutureCount() };
groups.Subscribe(group =>
{
group.Count.ContinueWith(
x => Console.WriteLine("Length: {0}; Count: {1}",
group.Length, x.Result),
TaskContinuationOptions.ExecuteSynchronously);
});
var source = ReadLines("../../Program.cs");
source.ToObservable(Scheduler.Now).Subscribe(subject);
Conclusion
That’s the lot, so far. It feels like I’m sort of in the spirit of Reactive Extensions, but that maybe I’m pushing it (no pun intended) in a direction which Erik and Wes either didn’t anticipate, or at least don’t view as particularly valuable/elegant. I very much doubt that they didn’t consider deferred aggregates – it’s much more likely that either I’ve missed some easy way of doing this, or there are good reasons why it’s a bad idea. I hope to find out which at some point… but in the meantime, I really ought to work out a more idiomatic example for C# in Depth.