Update: I’ve got a new and simpler design now. I’m leaving this in for historical interest, but please see the entry about the new design for more recent information.
This post is going to be hard to write, simply because I can’t remember ever writing quite such bizarre code before. When I find something difficult to keep in my own head, explaining it to others is somewhat daunting, especially when blogging is so much more restrictive than face-to-face discussion. Oh, and you’ll find it hard if you’re not familiar with lambda expression syntax (x => x.Foo etc). Just thought I’d warn you.
It’s possibly easiest to explain with an example. It’s one I’m hoping to use in a magazine article – but I certainly won’t be trying to explain this in the article. Imagine you’ve got a lot of log entries on disk – by which I mean hundreds of millions. You certainly don’t want all of that in memory. However, each of the log entries contains a customer ID, and you want to find out for each customer how many entries there are. Here’s a LINQ query which would work but be horribly inefficient, loading everything into memory:
var query = from entry in logEntryReader
group entry by entry.Customer into entriesByCustomer
let count = entriesByCustomer.Count()
orderby count descending
select new { Customer = entriesByCustomer.Key, Count = count };
Now, it’s easy to improve this somewhat just by changing the “group entry by” to “group 1 by” – that way the entries themselves are thrown away. However, you’ve still got some memory per entry – a huge great enumeration of 1s to count after grouping.
The problem is that you can’t tell “group … by” how to aggregate the sequence associated with each key. This isn’t just because there’s no syntax to express it – it’s to do with the nature of IEnumerable itself. You see, the “pull” nature of IEnumerable is a problem. While a thread is waiting for more data, it will just block. Normally, an aggregator (like Count) just picks data off a sequence until it reaches the end, then returns the result. How can that work when there are multiple sequences involved (one for each customer)?
There are three answers to this:
1) Write your own group-and-count method. This is pretty straightforward, and potentially useful in many situations. It’s also fairly easy to understand. You just iterate through a sequence, and keep a dictionary of key to int, increasing each key’s count as you see elements. This is the pragmatic solution when faced with a specific problem – but it feels like there should be something better, something that lets us group and then specify the processing in terms of standard query operators.
2) Create a new thread and producer/consumer style IEnumerable for each key. Clearly this doesn’t scale.
3) Invert control of enumerations: put the producer in the driving seat instead of the consumer. This is the approach we’re talking about for the rest of the post.
A word on the term “asynchronous”
I don’t know whether my approach could truly be called asynchronous. What I haven’t done is make any of the code thread-aware at all, or even thread-safe. All the processing of multiple sequences happens in a single thread. I also don’t have the full BeginXXX, EndXXX using IAsyncResult pattern. I started down that line, but it ended up being a lot more complicated than what I’ve got now.
I’m pretty sure that what I’ve been writing is along the lines of CSPs (Communicating Sequential Processes) but I wouldn’t in any way claim that it’s a CSP framework, either.
However, you may find that it helps to think about asynchronous APIs like Stream.BeginRead when looking at the rest of the code. In particular, reading a stream asynchronously has the same “say I’m interested in data, react to data, request some more” pattern.
Keeping the Count aggregator in mind, what we want to do is maintain a private count, and request some data. When we get called back to say there is more data, we increment our count (ignoring the data) and request some more. When we are told that there’s no more data, we can return the count.
With that said, here’s the interface for what I’ve called IPushEnumerator. The name is open for change – I’ve been through a few options, and I’m still not comfortable with it. Please feel free to suggest another one! Note that there isn’t an IPushEnumerable – again, I started off with one, but found it didn’t make sense. Maybe someone smarter than me will come up with a way of it fitting.
IPushEnumerator
public interface IPushEnumerator<T>
{
T Current { get; }
void BeginMoveNext(Action<bool> callback);
}
That bit is relatively easy to understand. I can ask to be called back when there’s data, and typically I’ll fetch the data within the callback and ask for more.
So far, so good. But what’s going to create these in the first place? How do we interface with LINQ? Time for an extension method.
Enumerable.GroupWithPush
I wanted to create an extension to IEnumerable<T> which had a “LINQ feel” to it. It should be quite like GroupBy, but then allow the processing of the subsequences to be expressed in a LINQ-like way. (Actual C# query expressions aren’t terribly useful in practice because there isn’t specific syntax for the kind of operators which turn out to be useful with this approach.) We’ll want to have type parameters for the original sequence (TElement), the key used for grouping (TKey) and the results of whatever processing is performed on each sequence (TResult).
So, the first parameter of our extension method is going to be an IEnumerable<TElement>. We’ll use a Func<TElement,TKey> to map source elements to keys. We could optionally allow an IEqualityComparer<TKey> too – but I’m certainly not planning on supporting as many overloads as Enumerable.GroupBy does. The final parameter, however, needs to be something to process the subsequence. The first thought would be Func<IPushEnumerator<TElement>,TResult> – until you start trying to implement the extension method or indeed the delegate doing the processing.
You see, given an IPushEnumerator<TElement> you really don’t want to return a result. Not just yet. After all, you don’t have the data yet, just a way of being given the data. What you want to return is the means of the caller obtaining the result after all the data has been provided. This is where we need to introduce a Future<T>.
Future<T>
If you don’t know about the idea of a future, it’s basically an IOU for a result. In proper threading libraries, futures allow the user to find out whether a computation has completed or not, wait for the result etc. My implementation of Future<T> is not that smart. It’s not smart at all. Here it is:
public class Future<T>
{
T value;
bool valueSet =
false;
public T Value
{
get
{
if (!valueSet)
{
throw new InvalidOperationException(“No value has been set yet”);
}
return value;
}
set
{
valueSet = true;
this.value = value;
}
}
}
With this in place, we can reveal the actual signature of GroupWithPush:
public static IEnumerable<KeyValuePair<TKey, TResult>> GroupWithPush<TElement, TKey, TResult>
(this IEnumerable<TElement> source,
Func<TElement, TKey> mapping,
Func<IPushEnumerator<TElement>, Future<TResult>> pipeline)
I shall leave you to mull over that – I don’t know about you, but signatures of generic methods always take me a little while to decode.
The plan is to then implement extension methods on IPushEnumerator<T> so that we can write code like this:
var query = logEntryReader.GroupWithCount(entry => entry.Customer,
sequence => sequence.Count());
foreach (var result in query)
{
Console.WriteLine (“Customer {0}: {1} entries”,
result.Key,
result.Value);
}
Okay, so how do we implement these operators? Let’s give an example – Count being pretty a simple case.
Implementing Count()
Let’s start off by looking at a possible Count implementation for a normal sequence, to act as a sort of model for the implementation in the weird and wacky land of futures and push enumerators:
public static int Count<T>(IEnumerable<T> source)
{
int count = 0;
foreach (T item in source)
{
count++;
}
return count;
}
Now, we’ve got two problems. Firstly, we’re not going to return the count – we’re going to return a Future. Secondly, we certainly can’t use foreach on an IPushEnumerator<T> – the whole point is to avoid blocking while we wait for data. However, the concept of “for each element in a sequence” is useful – so let’s see whether we can do something similar with another extension method, then come back and use it in Count.
Implementing ForEach()
Warning: this code hurts my head, and I wrote it. Even the idea of it hurts my head a bit. The plan is to implement a ForEach method which takes two delegates – one which is called for each item in the enumerator, and one which is called after all the data has been processed. It will return without blocking, but it will call BeginMoveNext first, using a delegate of its own. That delegate will be called when data is provided, and it will in turn call the delegates passed in as parameters, before calling BeginMoveNext again, etc.
Ready?
public static void ForEach<T>(
this IPushEnumerator<T> source,
Action<T> iteration,
Action completion)
{
Action<
bool> moveNextCallback =
null;
moveNextCallback = dataAvailable =>
{
if (dataAvailable)
{
iteration(source.Current);
source.BeginMoveNext(moveNextCallback);
}
else {
completion();
}
};
source.BeginMoveNext(moveNextCallback);
}
What I find particularly disturbing is that moveNextCallback is self-referential – it calls BeginMoveNext passing itself a the parameter. (Interestingly, you still need to assign it to null first, otherwise the compiler complains that it might be used without being assigned. I seem to remember reading a blog post about this before now, and thinking that I’d never ever run into such a situation. Hmm.)
Nasty as ForEach is in terms of implementation, it’s not too bad to use.
Implementing Count() – the actual code
The translation of the original Count is now relatively straightforward. We prepare the Future wrapper for the result, and indicate that we want to iterate through all the entries, counting them and then setting the result value when we’ve finished (which will be long after the method first returns, don’t forget).
public static Future<
int> Count<T>(
this IPushEnumerator<T> source)
{
Future<
int> ret =
new Future<
int>();
int count = 0;
source.ForEach(t => count++,
() => ret.Value = count);
return ret;
}
We’re nearly there now. All we need to do is complete the original GroupWithPush method:
Implementing GroupWithPush
There are three phases to GroupWithPush, as mentioned before: pushing the data to the consumers (creating those consumers as required based on the keys we see); telling all the consumers that we’ve finished; retrieving the results. It’s probably easiest just to show the code – it’s actually not too hard to understand.
public static IEnumerable<KeyValuePair<TKey, TResult>> GroupWithPush<TElement, TKey, TResult>
(this IEnumerable<TElement> source,
Func<TElement, TKey> mapping,
Func<IPushEnumerator<TElement>, Future<TResult>> pipeline)
{
var enumerators = new Dictionary<TKey, SingleSlotPushEnumerator<TElement>>();
var results = new Dictionary<TKey, Future<TResult>>();
foreach (TElement element in source)
{
TKey key = mapping(element);
SingleSlotPushEnumerator<TElement> push;
if (!enumerators.TryGetValue(key, out push))
{
push = new SingleSlotPushEnumerator<TElement>();
results[key] = pipeline(push);
enumerators[key] = push;
}
push.Push(element);
}
foreach (SingleSlotPushEnumerator<TElement> push in enumerators.Values)
{
push.End();
}
foreach (var result in results)
{
yield return new KeyValuePair<TKey, TResult>(result.Key, result.Value.Value);
}
}
I haven’t introduced SingleSlotPushEnumerator before, but as you can imagine, it’s an implementation of IPushEnumerator, with Push() and End() methods to provide data or indicate the end of the data stream. It’s not terribly interesting to see, in my view.
Conclusion
So, that’s what I’ve been looking at and thinking about for the last few evenings. I’ve implemented quite a few of the standard query operators, although not all of them are worth doing. I’m not currently viewing this as anything more than an interesting exercise, partly in terms of seeing how far I can push the language, but if anyone thinks it’s worth pursuing further (e.g. as a complete implementation as far as sensibly possible, either in MiscUtil or on SourceForge) I’d be very happy to hear your ideas. Frankly, I’d be glad and slightly surprised just to find out that anyone made it this far.
Oh, exercise for the reader – draw out a sequence diagram of how all this behaves :)