Group pipelining returns: new and improved design

Last night’s blog post provoked a flurry of emails between myself and Marc Gravell. Looking back, trying to base the pipeline on a pseudo-asynchronous version of IEnumerable<T> was a mistake. We’ve now got a much more attractive interface to write extensions against:


public interface IDataProducer<T>
    event Action<T> DataProduced;
    event Action EndOfData;

Why is this so much better? A few reasons:

  1. Acting on all the data is much easier: just subscribe to the events. No need for a weird ForEach.
  2. It fits the normal processing model much better – we tend to want to process the data, not whether or not there are more elements on their way.
  3. It allows unsubscription when we’re not interested in any more data.
  4. It allows multiple subscribers.

The last point is very, very interesting. It means we can implement GroupWithPipeline (current new name – I suspect it won’t last forever though) to take multiple pipelines, producing a KeyValueTuple with, say, three values in, each of different types, still strongly typed. If we don’t care about the type safety, we can use “params…” to deal with as many pipelines as we want – the client then needs to cast, which isn’t ideal, but isn’t too bad.

As an idea of what I mean by this, consider this call to find the Max, Min and Average values, all without buffering:


var query = someSequenceOfOrders
                .GroupWithPipeline (entry => entry.Customer,
                                    seq => seq.Min(entry => entry.OrderSize),
                                    seq => seq.Average(entry => entry.OrderSize),
                                    seq => seq.Max(entry => entry.OrderSize));
                .Select(x => new { Customer = x.Key,
                                   Min = x.Value1,
                                   Avereage = x.Value2,
                                   Max = x.Value3 });
foreach (var result in query)
    Console.WriteLine (“Customer {0}: {1}/{2}/{3}”

We specify three different pipelines, all of which will be applied to the same sequence of data. The fact that we’ve specified OrderSize three times is unfortunate – a new overload to transform the entries passed to the pipeline is probably in order – but it’s all doable.

This sort of “in pipeline” multiple aggregation is very, very cool IMO. It’s turned the whole idea from “interesting” to “useful enough to get into MiscUtil”.

I haven’t actually written Min, Max or Average yet – although Marc has, I believe. (We’re collaborating and sharing source, but not working off a common source control system yet. It’s all a bit ad hoc.) What I know he’s done which is possibly even more useful than all of this to start with is used expression trees to implement generic maths. This is only execution time checked, which is unfortunate, but I don’t believe that will be a problem in real life.

The upshot is that the above code will work with any type with appropriate operators defined. No need for loads of overloads for decimal, long, int, float, double etc – it will just work. If you’re worried about performance, you can relax – it performs very, very well, which was a bit of a surprise to both of us.

More of that in another post though… I wanted to share the new design because it’s so much nicer than the deeply complicated stuff I was working with yesterday.

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s