Don’t call us, we’ll call you – push enumerators

Update: I’ve got a new and simpler design now. I’m leaving this in for historical interest, but please see the entry about the new design for more recent information.

This post is going to be hard to write, simply because I can’t remember ever writing quite such bizarre code before. When I find something difficult to keep in my own head, explaining it to others is somewhat daunting, especially when blogging is so much more restrictive than face-to-face discussion. Oh, and you’ll find it hard if you’re not familiar with lambda expression syntax (x => x.Foo etc). Just thought I’d warn you.

It’s possibly easiest to explain with an example. It’s one I’m hoping to use in a magazine article – but I certainly won’t be trying to explain this in the article. Imagine you’ve got a lot of log entries on disk – by which I mean hundreds of millions. You certainly don’t want all of that in memory. However, each of the log entries contains a customer ID, and you want to find out for each customer how many entries there are. Here’s a LINQ query which would work but be horribly inefficient, loading everything into memory:

var query = from entry in logEntryReader
            group entry by entry.Customer into entriesByCustomer
            let count = entriesByCustomer.Count()
            orderby count descending
            select new { Customer = entriesByCustomer.Key, Count = count };

Now, it’s easy to improve this somewhat just by changing the “group entry by” to “group 1 by” – that way the entries themselves are thrown away. However, you’ve still got some memory per entry – a huge great enumeration of 1s to count after grouping.

The problem is that you can’t tell “group … by” how to aggregate the sequence associated with each key. This isn’t just because there’s no syntax to express it – it’s to do with the nature of IEnumerable itself. You see, the “pull” nature of IEnumerable is a problem. While a thread is waiting for more data, it will just block. Normally, an aggregator (like Count) just picks data off a sequence until it reaches the end, then returns the result. How can that work when there are multiple sequences involved (one for each customer)?

There are three answers to this:

1) Write your own group-and-count method. This is pretty straightforward, and potentially useful in many situations. It’s also fairly easy to understand. You just iterate through a sequence, and keep a dictionary of key to int, increasing each key’s count as you see elements. This is the pragmatic solution when faced with a specific problem – but it feels like there should be something better, something that lets us group and then specify the processing in terms of standard query operators.

2) Create a new thread and producer/consumer style IEnumerable for each key. Clearly this doesn’t scale.

3) Invert control of enumerations: put the producer in the driving seat instead of the consumer. This is the approach we’re talking about for the rest of the post.

A word on the term “asynchronous”

I don’t know whether my approach could truly be called asynchronous. What I haven’t done is make any of the code thread-aware at all, or even thread-safe. All the processing of multiple sequences happens in a single thread. I also don’t have the full BeginXXX, EndXXX using IAsyncResult pattern. I started down that line, but it ended up being a lot more complicated than what I’ve got now.

I’m pretty sure that what I’ve been writing is along the lines of CSPs (Communicating Sequential Processes) but I wouldn’t in any way claim that it’s a CSP framework, either.

However, you may find that it helps to think about asynchronous APIs like Stream.BeginRead when looking at the rest of the code. In particular, reading a stream asynchronously has the same “say I’m interested in data, react to data, request some more” pattern.

Keeping the Count aggregator in mind, what we want to do is maintain a private count, and request some data. When we get called back to say there is more data, we increment our count (ignoring the data) and request some more. When we are told that there’s no more data, we can return the count.

With that said, here’s the interface for what I’ve called IPushEnumerator. The name is open for change – I’ve been through a few options, and I’m still not comfortable with it. Please feel free to suggest another one! Note that there isn’t an IPushEnumerable – again, I started off with one, but found it didn’t make sense. Maybe someone smarter than me will come up with a way of it fitting.

IPushEnumerator

/// <summary>
/// An enumerator which works in an inverse manner – the consumer requests
/// to be notified when a value is available, and then the enumerator
/// will call back to the consumer when the producer makes some data
/// available or indicates that all the data has been produced.
/// </summary>
public interface IPushEnumerator<T>
{
    /// <summary>
    /// Fetch the current value. Not valid until
    /// a callback with a True argument has occurred,
    /// or after a callback 
    /// </summary>
    T Current { get; }

    /// <summary>
    /// Requests notification (via the specified callback) when more
    /// data is available or when the end of the data has been reached.
    /// The argument passed to the callback indicates which of these
    /// conditions has been met – logically the result of MoveNext
    /// on a normal IEnumerator.
    /// </summary>
    void BeginMoveNext(Action<bool> callback);
}

That bit is relatively easy to understand. I can ask to be called back when there’s data, and typically I’ll fetch the data within the callback and ask for more.

So far, so good. But what’s going to create these in the first place? How do we interface with LINQ? Time for an extension method.

Enumerable.GroupWithPush

I wanted to create an extension to IEnumerable<T> which had a “LINQ feel” to it. It should be quite like GroupBy, but then allow the processing of the subsequences to be expressed in a LINQ-like way. (Actual C# query expressions aren’t terribly useful in practice because there isn’t specific syntax for the kind of operators which turn out to be useful with this approach.) We’ll want to have type parameters for the original sequence (TElement), the key used for grouping (TKey) and the results of whatever processing is performed on each sequence (TResult).

So, the first parameter of our extension method is going to be an IEnumerable<TElement>. We’ll use a Func<TElement,TKey> to map source elements to keys. We could optionally allow an IEqualityComparer<TKey> too – but I’m certainly not planning on supporting as many overloads as Enumerable.GroupBy does. The final parameter, however, needs to be something to process the subsequence. The first thought would be Func<IPushEnumerator<TElement>,TResult> – until you start trying to implement the extension method or indeed the delegate doing the processing.

You see, given an IPushEnumerator<TElement> you really don’t want to return a result. Not just yet. After all, you don’t have the data yet, just a way of being given the data. What you want to return is the means of the caller obtaining the result after all the data has been provided. This is where we need to introduce a Future<T>.

Future<T>

If you don’t know about the idea of a future, it’s basically an IOU for a result. In proper threading libraries, futures allow the user to find out whether a computation has completed or not, wait for the result etc. My implementation of Future<T> is not that smart. It’s not smart at all. Here it is:

/// <summary>
/// Poor-man’s version of a Future. This wraps a result which *will* be
/// available in the future. It’s up to the caller/provider to make sure
/// that the value has been specified by the time it’s requested.
/// </summary>
public class Future<T>
{
    T value;
    bool valueSet = false;

    public T Value 
    {
        get
        {
            if (!valueSet)
            {
                throw new InvalidOperationException(“No value has been set yet”);
            }
            return value;
        }
        set
        {
            valueSet = true;
            this.value = value;
        }
    }
}

With this in place, we can reveal the actual signature of GroupWithPush:

public static IEnumerable<KeyValuePair<TKey, TResult>> GroupWithPush<TElement, TKey, TResult>
    (this IEnumerable<TElement> source,
     Func<TElement, TKey> mapping,
     Func<IPushEnumerator<TElement>, Future<TResult>> pipeline)

I shall leave you to mull over that – I don’t know about you, but signatures of generic methods always take me a little while to decode.

The plan is to then implement extension methods on IPushEnumerator<T> so that we can write code like this:

var query = logEntryReader.GroupWithCount(entry => entry.Customer,
                                          sequence => sequence.Count());

foreach (var result in query)
{
    Console.WriteLine (“Customer {0}: {1} entries”,
                       result.Key,
                       result.Value);
}

Okay, so how do we implement these operators? Let’s give an example – Count being pretty a simple case.

Implementing Count()

Let’s start off by looking at a possible Count implementation for a normal sequence, to act as a sort of model for the implementation in the weird and wacky land of futures and push enumerators:

public static int Count<T>(IEnumerable<T> source)
{
    int count = 0;
    foreach (T item in source)
    {
        count++;
    }
    return count;
}

Now, we’ve got two problems. Firstly, we’re not going to return the count – we’re going to return a Future. Secondly, we certainly can’t use foreach on an IPushEnumerator<T> – the whole point is to avoid blocking while we wait for data. However, the concept of “for each element in a sequence” is useful – so let’s see whether we can do something similar with another extension method, then come back and use it in Count.

Implementing ForEach()

Warning: this code hurts my head, and I wrote it. Even the idea of it hurts my head a bit. The plan is to implement a ForEach method which takes two delegates – one which is called for each item in the enumerator, and one which is called after all the data has been processed. It will return without blocking, but it will call BeginMoveNext first, using a delegate of its own. That delegate will be called when data is provided, and it will in turn call the delegates passed in as parameters, before calling BeginMoveNext again, etc.

Ready?

public static void ForEach<T>(this IPushEnumerator<T> source, 
                              Action<T> iteration,
                              Action completion)
{
    Action<bool> moveNextCallback = null;
            
    moveNextCallback = dataAvailable =>
         {
             if (dataAvailable)
             {
                 iteration(source.Current);
                 source.BeginMoveNext(moveNextCallback);
             }
             else
             {
                 completion();
             }
         };

    source.BeginMoveNext(moveNextCallback);
}

What I find particularly disturbing is that moveNextCallback is self-referential – it calls BeginMoveNext passing itself a the parameter. (Interestingly, you still need to assign it to null first, otherwise the compiler complains that it might be used without being assigned. I seem to remember reading a blog post about this before now, and thinking that I’d never ever run into such a situation. Hmm.)

Nasty as ForEach is in terms of implementation, it’s not too bad to use.

Implementing Count() – the actual code

The translation of the original Count is now relatively straightforward. We prepare the Future wrapper for the result, and indicate that we want to iterate through all the entries, counting them and then setting the result value when we’ve finished (which will be long after the method first returns, don’t forget).

public static Future<int> Count<T>(this IPushEnumerator<T> source)
{
    Future<int> ret = new Future<int>();
    int count = 0;

    source.ForEach(t => count++, 
                   () => ret.Value = count);
    return ret;
}

We’re nearly there now. All we need to do is complete the original GroupWithPush method:

Implementing GroupWithPush

There are three phases to GroupWithPush, as mentioned before: pushing the data to the consumers (creating those consumers as required based on the keys we see); telling all the consumers that we’ve finished; retrieving the results. It’s probably easiest just to show the code – it’s actually not too hard to understand.

public static IEnumerable<KeyValuePair<TKey, TResult>> GroupWithPush<TElement, TKey, TResult>
    (this IEnumerable<TElement> source,
     Func<TElement, TKey> mapping,
     Func<IPushEnumerator<TElement>, Future<TResult>> pipeline)
{
    var enumerators = new Dictionary<TKey, SingleSlotPushEnumerator<TElement>>();
    var results = new Dictionary<TKey, Future<TResult>>();
    // Group the data, pushing it to the enumerators at the same time.
    foreach (TElement element in source)
    {
        TKey key = mapping(element);
        SingleSlotPushEnumerator<TElement> push;
        if (!enumerators.TryGetValue(key, out push))
        {
            push = new SingleSlotPushEnumerator<TElement>();
            results[key] = pipeline(push);
            enumerators[key] = push;
        }
        push.Push(element);
    }
    // Indicate to all the enumerators that we’ve finished
    foreach (SingleSlotPushEnumerator<TElement> push in enumerators.Values)
    {
        push.End();
    }
    // Collect the results, converting Future<T> into T for each one.
    foreach (var result in results)
    {
        yield return new KeyValuePair<TKey, TResult>(result.Key, result.Value.Value);
    }
}

I haven’t introduced SingleSlotPushEnumerator before, but as you can imagine, it’s an implementation of IPushEnumerator, with Push() and End() methods to provide data or indicate the end of the data stream. It’s not terribly interesting to see, in my view.

Conclusion

So, that’s what I’ve been looking at and thinking about for the last few evenings. I’ve implemented quite a few of the standard query operators, although not all of them are worth doing. I’m not currently viewing this as anything more than an interesting exercise, partly in terms of seeing how far I can push the language, but if anyone thinks it’s worth pursuing further (e.g. as a complete implementation as far as sensibly possible, either in MiscUtil or on SourceForge) I’d be very happy to hear your ideas. Frankly, I’d be glad and slightly surprised just to find out that anyone made it this far.

Oh, exercise for the reader – draw out a sequence diagram of how all this behaves :)

8 thoughts on “Don’t call us, we’ll call you – push enumerators”

  1. Okay, I admit. I haven’t managed to read through this whole post. I also don’t know enough LINQ to make it likely I really understand every detail. However, I will point out that of the three possible solutions suggested for counting the data by some key, the #2 is actually not as bad as it seems and well-supported in Windows.

    I agree that creating a new thread for each key won’t scale well, but that’s why Windows has i/o completion ports. You can assign some fixed number of threads to handle the i/o completion ports, and create one port per key enumerator. As long as the enumeration is managed per-port, no direct relationship between a thread and an enumeration key has to exist, allowing the algorithm to scale while still using threads to maximize performance.

    The OS will process the i/o as fast as it can, allowing some small number of threads (usually on the order of the number of CPU cores…in this case, since each completed i/o can be processed quickly, the extra threads that are sometimes used with IOCP wouldn’t be needed) to run continuously, handling the i/o as it happens.

    Of course, all this assumes that the data has some efficient by-key retrieval I think. Surely if it’s actually a linear, unindexed log file it would be just fine to just read through the file from start to finish and use an in-memory hash-table (e.g. Dictionary) to count instances?

    This whole thing is starting to sound kind of familiar… :)

    Like

  2. BTW, another comment formatting if you don’t mind. I’m finding that in my browser windows, which I tend to keep in a “portrait” aspect ratio (taller than wide), the code and comments within a “div.code” section spills out of the div. In extreme cases, the text is truncated by the browser windows. :(

    This happens in both Mac Opera 9 and Mac Safari 2.

    Like

  3. Okay, I got to the end. I pretty much get how the enumerator stuff works. Unfortunately, lacking any useful LINQ or extension method knowledge, I’m at a loss as to how this GroupWithPush method gets used. :)

    While it might not completely solve my dilemma, the article might be served well with a simple example of how everything gets hooked up in the end.

    Seems to me you might as well include the code for SingleSlotPushEnumerator also, but since that one I think I can figure out on my own, I’m less emphatic about that. :)

    Like

  4. Interesting article! I think that changing the terminology a bit simplifies things considerably.

    Instead of IPushEnumerator, let’s consider IAggregator:

    public interface IAggregator
    {
    TResult Result { get; }

    void ConsumeElement(TElement element);
    }

    IAggregator is similar to your IPushEnumerator. This would be a class implementing IAggregator:

    public class Aggregator
    {
    Func aggregateFunc;
    TResult result;

    public Aggregator(TResult initValue, Func aggregateFunc) {
    this.result = initValue;
    this.aggregateFunc = aggregateFunc;
    }

    public TResult
    {
    get { return this.result; }
    }

    public void ConsumeElement(TElement element)
    {
    this.result = this.aggregateFunc(this.result, element);
    }
    }

    Instead of your ForEach, let’s have Aggregate():

    public static TResult Aggregate(
    this IEnumerable source,
    TResult initialValue,
    Func aggregateFunc)
    {
    Aggregator aggregator =
    new Aggregator(initialValue, aggregateFunc);

    foreach(TElement el in source)
    aggregator.ConsumeElement(source);

    return aggregator.Result;
    }

    Then, both Count and GroupWithPush are quite simple to implement using the Aggregate extension method above.

    What do you think about this abstraction? I think it is simpler to wrap your head around.

    Disclaimer: I didn’t actually check whether any of the code I wrote compiles.

    Like

  5. I see the general idea, I think – effectively take one level of abstraction out of the equation.

    I’m currently at a slight loss to work out what Count should be like. Also, can this compose well? For instance, could I aggregate over:

    subsequence.Skip(3).Sum()

    ?

    Jon

    Like

  6. Peter – yes, I can see the bleed too. Not sure the best thing to do about it, but I’ll investigate. Ideally I suppose we’d get horizontal scrollbars – I don’t really want it to wrap.

    As for an example of use – it’s there, but possibly buried a bit:

    var query = logEntryReader.GroupWithCount
    (entry => entry.Customer,
    sequence => sequence.Count());

    foreach (var result in query)
    {
    Console.WriteLine
    (“Customer {0}: {1} entries”,
    result.Key,
    result.Value);
    }

    It can be a whole chain though. For instance, to have an aggregate of the sum of the first three entries of each subsequence:

    someSequence.GroupWithPush(entry=>entry.Key,
    subsequence=>subsequence.Take(3).Sum())

    Does that help?

    Jon

    Like

  7. Igor: thinking further, I don’t think that you *can* take out the extra level of abstraction. It’s that level which goes from “pull” to “push”. There has to be something for Count() to be an extension method on, and it can’t be IEnumerable (which has “pull” semantics).

    However, I think it *is* worth changing the interface significantly – Marc Gravell and I have been discussing this by mail, and we’ve got some interesting ideas…

    Jon

    Like

  8. It looks like you’re trying to implement what would be a natural imperative implementation (scan thru elements, create new group agg as needed, then increment group) and retrofit it into the LINQ syntax.
    I’m still trying to wrap my head around the code ;-)

    Like

Leave a comment