“Push” LINQ revisited – next attempt at an explanation

Marc Gravell and I have now implemented a lot of LINQ standard query operators on the “push” model of IDataProducer as opposed to the “pull” model of IEnumerable. My good friend Douglas Leeder (who doesn’t use C#) has been with me this weekend, and through explaining the “big picture” to him in various ways, and taking his feedback, I think I’ve now got a good way of communicating it. Voting.

It’s a “real life analogy” which is always dangerous – don’t think of it too literally – I’m not claiming that it’s meant to be an absolute 1:1 correspondence. However, I think it neatly demonstrates the problem and some of the benefits of the solution we’ve come up with.

In order to make all this concrete, all of the code is real, and can be downloaded as a zip of a VS2008 solution. It contains a binary of an unreleased version of MiscUtil which is where the DataProducer stuff currently lives.

Real life situation

Let’s suppose we’re trying to find out what the favourite colour is of everyone in the world. Now, for the purposes of the demo code, there are only four colours and six people in the world: that makes the diagrams nice and easy, and we can see results simply too. Extending the data to the rest of the real world is left as an exercise to the reader. We may also want additional information, such as the average ages of people voting for particular colours.

Here’s our complete sample data – the five members of my family, and Douglas:

Name Age Favourite colour
Jon 31 Blue
Douglas 28 red
Holly 31 Purple
Tom 4 Pink
Robin 1 RED
William 1 blue

Note how the colours are specified with variations of case. We’ll use that later as an example of why you might need to specify a “key comparer”.

There are various ways of implementing this in LINQ, and for each model we’ll provide code and think about how it would work in the real world.

Model 1: “Pull” model

This is the model which “normal” LINQ uses – you only ever pull data, using an IEnumerable<T>. Here’s a simple query expression which gives the answers we want (admittedly unordered – we’ll ignore that for now):

 

var query = from voter in Voter.AllVoters()
            group voter by voter.FavouriteColour.ToUpper() into grouped
            select new { Colour = grouped.Key, Votes = grouped.Count() };

foreach (var entry in query)
{
    Console.WriteLine(“Colour {0} has {1} votes”, entry.Colour, entry.Votes);
}

There are two problems here.

Firstly, we’re using ToUpper() to get round the “RED != red” problem. This is not only bad in terms of internationalisation, but it also loses data. We really want to get the original string as the key, and then use a case-insensitive comparer. We can do this by a manual call to GroupBy instead of using query expressions – there’s an overload which takes an IEqualityComparer.

Secondly, the result of the “group … by” keeps all the voter data temporarily. It has to all be available at the same time before the “select” kicks in. This runs counter to the normal “streaming” idea of LINQ. This is inherent in the nature of the “pull” model, as I’ll explain in a minute.

Now, let’s see what this looks like in the real world. People come into a room through a door, and a “grouper” asks them for their favourite colour. The grouper then tells each voter (immediately) which corner of the room to stand in. The result at the end of the grouping is this:

After the initial grouping, another person goes to each group in turn, finding out their key and doing a head count. That group is then free to go. The important thing is that this person can’t do their job until all the data is in, because they’ve got to be able to see everyone in order to count them.

Improvement to pull model: just keep a token presence

The fact that we used “group voter by …” meant that the result of the grouping still involved whole people. As we’re only going to do a head count, we only need something saying “There was a person here.” We can change our original query to do that quite easily:

 

var query = from voter in Voter.AllVoters()
             group 1 by voter.FavouriteColour.ToUpper() into grouped
             select new { Colour = grouped.Key, Votes = grouped.Count() };

 foreach (var entry in query)
{
     Console.WriteLine(“Colour {0} has {1} votes”, entry.Colour, entry.Votes);
}

 

This time, after the grouping takes place, the room looks like this:

The use of 1 here is purely incidental: we could have used ‘group “Spartacus” by …’ and the results would have been the same. It’s just something which can be counted.

Now, there’s good and bad here:

  • We’re not taking as much memory here. If voters have large amounts of data attached to them, we’ve reduced our requirements significantly.
  • We still have one object per voter, all in memory at the same time. Think “population of the world”.
  • We’ve lost our age data, which would make any extra aggregation impossible.

Model 2: “Push” model

The problem with the pull model is that each aggregator always wants to be the only thing pulling. The call to MoveNext will block until more data is available. That’s a real problem when you want to have multiple aggregators (one vote counter per colour). We could do a complicated threading manoeuvre, with each colour getting its own thread and the “grouper” pushing items out to relevant threads. Again though, that doesn’t scale – the four extra threads in our example aren’t too bad, but imagine other groupings with potentially thousands of keys.

The alternative is to change the model. Instead of having a greedy aggregator pulling data, we change to aggregators who observe data being pushed past them, and also observe a special “all the data has now been pushed” signal. Before we look at the code to do this, let’s think about what it could be like in real life. We don’t know how many different colours will be voted on, but we know what we need to do with each one: count the number of votes for them. In detail, the situation would be something like this:

  • The grouper stands just inside the door of the room, and “pulls” voters in the normal way
  • For any voter:
    • Ask the voter which colour they wish to vote for
    • Check to see if that colour is a “new” one. If it is, create a “counter” person for that colour, and position them by an exit in the room. (We create new exits as we go. We’ll assume there’s a sledgehammer at the ready.)
    • Send the voter past the relevant “counter” person, through the exit near them
    • Each counter just counts how many voters they see going past them
  • When all voters have been pulled, tell each of the counters and ask them how many people they saw

We never have more than one voter in the room at once:

Let’s have a look at the code involved now.

Using the “push” model

There are two sides to the code here: the code that the LINQ user has to write, and the code Marc Gravell and I have implemented. We’ll look at the client code in a few different scenarios.

1) GroupWithPipeline in the middle of normal LINQ

Keeping to the normal “start with a data source, do something, then select” model involves stepping away from query expressions. We’ve got a new extension method on IEnumerable<T> called GroupWithPipeline, which takes a key selector (just like the normal GroupBy) and what to do with the results of each grouping. Here’s the new code (which requires a using directive for MiscUtil.Linq.Extensions):

 

var query = Voter.AllVoters()
                 .GroupWithPipeline(voter => voter.FavouriteColour.ToUpper(),
                                    voters => voters.Count())
                 .Select(grouped => new { Colour = grouped.Key, Votes = grouped.Value });

foreach (var entry in query)
{
    Console.WriteLine(“Colour {0} has {1} votes”, entry.Colour, entry.Votes);
}

How about making this a bit smarter now? Let’s try to also work out the minimum and maximum ages of the voters for each colour. Conceptually this is just a case of adding extra observers along with each vote counter in the “real life” model above. The code is remarkably simple:

 

var query = Voter.AllVoters()
                 .GroupWithPipeline(voter => voter.FavouriteColour.ToUpper(),
                                    voters => voters.Count(),
                                    voters => voters.Min(voter => voter.Age),
                                    voters => voters.Max(voter => voter.Age))
                 .Select(grouped => new { Colour = grouped.Key,
                                          Votes = grouped.Value1,
                                          MinAge = grouped.Value2,
                                          MaxAge = grouped.Value3});

foreach (var entry in query)
{
    Console.WriteLine(“Colour {0} has {1} votes. Age range: {2}-{3}”, entry.Colour, entry.Votes, entry.MinAge, entry.MaxAge);
}

The fact that it uses “Value1”, “Value2” and “Value3” isn’t ideal, but unfortunately there’s no way round that as far as we’ve worked out – for this part.

2) Using DataProducer directly for multiple aggregates

GroupWithPipeline uses a few types internally which you can use directly instead: DataProducer (implementing IDataProducer) and Future (implemeting IFuture). If I go into the details here, I’ll never get this posted – but that may come into another post if there’s enough interest. However, let’s have a look at how it can be used. First, let’s find the results of a few aggregates of our voters, this time without any groupings:

 

// Create the data source to watch
DataProducer<Voter> voters = new DataProducer<Voter>();

// Add the aggregators
IFuture<int> total = voters.Count();
IFuture<int> adults = voters.Count(voter => voter.Age >= 18);
IFuture<int> children = voters.Where(voter => voter.Age < 18).Count();
IFuture<int> youngest = voters.Min(voter => voter.Age);
IFuture<int> oldest = voters.Select(voter => voter.Age).Max();

// Push all the data through
voters.ProduceAndEnd(Voter.AllVoters());

// Write out the results
Console.WriteLine(“Total voters: {0}”, total.Value);
Console.WriteLine(“Adult voters: {0}”, adults.Value);
Console.WriteLine(“Child voters: {0}”, children.Value);
Console.WriteLine(“Youngest vote age: {0}”, youngest.Value);
Console.WriteLine(“Oldest voter age: {0}”, oldest.Value);

The output of the code is what you’d expect, but there are a few things to note:

  1. Each aggregate returns an IFuture<int> instead of an int. This is because we set up the aggregators before we produce any data. We need to use the Value property to get the actual value back after we’ve produced the data.
  2. Just to hammer the point home, we must set up the aggregators (calling Count etc) before we produce the data (in ProduceAndEnd). Otherwise the aggregators won’t have any data to work with.
  3. We can chain operators together (Select and then Max, or Where and then Count) just as with normal LINQ.
  4. We’re applying multiple aggregates, but the data is only being produced once. This just can’t be done with normal LINQ. ProduceAndEnd takes an IEnumerable<T> which could be another LINQ query – something fetching large amounts of data from files, etc. Everything will be streamed appropriately.

3) Using DataProducer in query expressions

This part wouldn’t have been available when I started writing this post. I hadn’t quite realised the power of the pattern yet.
By implementing GroupBy on IDataProducer, we can perform the original grouping in a query expression, in
a pretty normal kind of way… except that this time we can apply multiple aggregates, never buffering the data beyond the results of the
aggregation:

 

DataProducer<Voter> voters = new DataProducer<Voter>();

var query = from voter in voters
            group voter by voter.FavouriteColour.ToUpper() into grouped
            select new { Colour = grouped.Key, 
                         Votes = grouped.Count(),
                         MinAge = grouped.Min(voter => voter.Age),
                         MaxAge = grouped.Max(voter => voter.Age)};

var results = query.AsEnumerable();

voters.ProduceAndEnd(Voter.AllVoters());

foreach (var entry in results)
{
    Console.WriteLine(“Colour {0} has {1} votes. Age range: {2}-{3}”,
                      entry.Colour, entry.Votes.Value, 
                      entry.MinAge.Value, entry.MaxAge.Value);
}

There’s just one tricky bit in here – you must call AsEnumerable before the data is produced, otherwise the aggregators will stream all their data with nothing watching for the results. In fact, AsEnumerable builds a list internally – the final results are buffered, but only those results. There’s really not a lot that can be done about that.

So, there we go. That may or may not be a bit clearer now. I’m still learning both the power of the pattern, its potential uses, and the best ways of explaining it. Feedback is very welcome, both on the technical front and about the explanation. I’m absolutely convinced that it’s a useful pattern in some situations (though not all). All the code will be released as part of MiscUtil eventually, of course – we’re still tidying it up and producing a bit more documentation at the moment.

12 thoughts on ““Push” LINQ revisited – next attempt at an explanation”

  1. Too slow, too bloated and killing a machine by further order of magnitude (another 0 added).

    LINQ finally out of hype, try and attempt the same with imperative code.

    Like

  2. Ara, it’s not clear whether you mean “push” LINQ or LINQ in general. Furthermore, I certainly haven’t seen LINQ taking an order of magnitude out of performance, when used properly. Evidence?

    Like

  3. Very cool stuff! :)

    A thought: why not have a factory method on the DataProducer so that to create a query (as in the last example in your post) you do

    var query = voters.Query( from voter in voters…. );

    This method could then hide the AsEnumerable() ‘hack’, and it would sit nicely next to the IFuture methods on there aswell.

    Like

  4. Very nice. Any chance that you’ll be posting the DataProducer source code, or do you just plan on publishing that as a binary?

    Thanks for the article!

    Like

  5. I’ve seen this pattern before, even recently. It’s well-worth the study.

    The decision to make is who controls when to halt the data. The “Push” model places that decision at the data source, whereas the “Pull” model, naturally, places it with the requester. Both are valid choices to make, based on the nature of the two ends involved (eg, is one end a serial port, or are you wanting to truncate the data after finding the first Foo?).

    Like

  6. @Keith – a bit overdue now (I was just passing), but the observers can take themselves out of the loop fairly easily – the First/Take etc implementations do exactly this.

    More importantly, if they have something chained watching data from *them*, they send the “all done” signal at this early time – so the result of your .Where(…).Take(10).Sum() can be available long before the “real” data feed has finished… as soon as we’ve seen 10 items, we can (and do) sum them.

    Like

  7. Tremendous!

    Could you pass ‘query’ into the ProduceAndEnd() method, have it create ‘results’ (calling query.AsEnumerable()), then return results from ProduceAndEnd()?

    Like

  8. @Jeff:

    I now have DataProducer.PumpProduceAndEnd; it’s probably easier to look at the code (in MiscUtil) than explain. Basically it takes a “final results” enumerable and yields any values produced by that, pumping more data into the pipeline when there’s nothing being yielded. Unfortunately this introduces buffering, but it’s relatively minimal.

    Like

Leave a comment