Marc Gravell and I have now implemented a lot of LINQ standard query operators on the “push” model of IDataProducer as opposed to the “pull” model of IEnumerable. My good friend Douglas Leeder (who doesn’t use C#) has been with me this weekend, and through explaining the “big picture” to him in various ways, and taking his feedback, I think I’ve now got a good way of communicating it. Voting.
It’s a “real life analogy” which is always dangerous – don’t think of it too literally – I’m not claiming that it’s meant to be an absolute 1:1 correspondence. However, I think it neatly demonstrates the problem and some of the benefits of the solution we’ve come up with.
In order to make all this concrete, all of the code is real, and can be downloaded as a zip of a VS2008 solution. It contains a binary of an unreleased version of MiscUtil which is where the DataProducer stuff currently lives.
Real life situation
Let’s suppose we’re trying to find out what the favourite colour is of everyone in the world. Now, for the purposes of the demo code, there are only four colours and six people in the world: that makes the diagrams nice and easy, and we can see results simply too. Extending the data to the rest of the real world is left as an exercise to the reader. We may also want additional information, such as the average ages of people voting for particular colours.
Here’s our complete sample data – the five members of my family, and Douglas:
Name |
Age |
Favourite colour |
Jon |
31 |
Blue |
Douglas |
28 |
red |
Holly |
31 |
Purple |
Tom |
4 |
Pink |
Robin |
1 |
RED |
William |
1 |
blue |
Note how the colours are specified with variations of case. We’ll use that later as an example of why you might need to specify a “key comparer”.
There are various ways of implementing this in LINQ, and for each model we’ll provide code and think about how it would work in the real world.
Model 1: “Pull” model
This is the model which “normal” LINQ uses – you only ever pull data, using an IEnumerable<T>. Here’s a simple query expression which gives the answers we want (admittedly unordered – we’ll ignore that for now):
var query =
from voter
in Voter.AllVoters()
group voter
by voter.FavouriteColour.ToUpper()
into grouped
select new { Colour = grouped.Key, Votes = grouped.Count() };
foreach (var entry in query)
{
Console.WriteLine(“Colour {0} has {1} votes”, entry.Colour, entry.Votes);
}
There are two problems here.
Firstly, we’re using ToUpper() to get round the “RED != red” problem. This is not only bad in terms of internationalisation, but it also loses data. We really want to get the original string as the key, and then use a case-insensitive comparer. We can do this by a manual call to GroupBy instead of using query expressions – there’s an overload which takes an IEqualityComparer.
Secondly, the result of the “group … by” keeps all the voter data temporarily. It has to all be available at the same time before the “select” kicks in. This runs counter to the normal “streaming” idea of LINQ. This is inherent in the nature of the “pull” model, as I’ll explain in a minute.
Now, let’s see what this looks like in the real world. People come into a room through a door, and a “grouper” asks them for their favourite colour. The grouper then tells each voter (immediately) which corner of the room to stand in. The result at the end of the grouping is this:

After the initial grouping, another person goes to each group in turn, finding out their key and doing a head count. That group is then free to go. The important thing is that this person can’t do their job until all the data is in, because they’ve got to be able to see everyone in order to count them.
Improvement to pull model: just keep a token presence
The fact that we used “group voter by …” meant that the result of the grouping still involved whole people. As we’re only going to do a head count, we only need something saying “There was a person here.” We can change our original query to do that quite easily:
var query =
from voter
in Voter.AllVoters()
group 1
by voter.FavouriteColour.ToUpper()
into grouped
select new { Colour = grouped.Key, Votes = grouped.Count() };
foreach (var entry in query)
{
Console.WriteLine(“Colour {0} has {1} votes”, entry.Colour, entry.Votes);
}
This time, after the grouping takes place, the room looks like this:

The use of 1 here is purely incidental: we could have used ‘group “Spartacus” by …’ and the results would have been the same. It’s just something which can be counted.
Now, there’s good and bad here:
- We’re not taking as much memory here. If voters have large amounts of data attached to them, we’ve reduced our requirements significantly.
- We still have one object per voter, all in memory at the same time. Think “population of the world”.
- We’ve lost our age data, which would make any extra aggregation impossible.
Model 2: “Push” model
The problem with the pull model is that each aggregator always wants to be the only thing pulling. The call to MoveNext will block until more data is available. That’s a real problem when you want to have multiple aggregators (one vote counter per colour). We could do a complicated threading manoeuvre, with each colour getting its own thread and the “grouper” pushing items out to relevant threads. Again though, that doesn’t scale – the four extra threads in our example aren’t too bad, but imagine other groupings with potentially thousands of keys.
The alternative is to change the model. Instead of having a greedy aggregator pulling data, we change to aggregators who observe data being pushed past them, and also observe a special “all the data has now been pushed” signal. Before we look at the code to do this, let’s think about what it could be like in real life. We don’t know how many different colours will be voted on, but we know what we need to do with each one: count the number of votes for them. In detail, the situation would be something like this:
- The grouper stands just inside the door of the room, and “pulls” voters in the normal way
- For any voter:
- Ask the voter which colour they wish to vote for
- Check to see if that colour is a “new” one. If it is, create a “counter” person for that colour, and position them by an exit in the room. (We create new exits as we go. We’ll assume there’s a sledgehammer at the ready.)
- Send the voter past the relevant “counter” person, through the exit near them
- Each counter just counts how many voters they see going past them
- When all voters have been pulled, tell each of the counters and ask them how many people they saw
We never have more than one voter in the room at once:

Let’s have a look at the code involved now.
Using the “push” model
There are two sides to the code here: the code that the LINQ user has to write, and the code Marc Gravell and I have implemented. We’ll look at the client code in a few different scenarios.
1) GroupWithPipeline in the middle of normal LINQ
Keeping to the normal “start with a data source, do something, then select” model involves stepping away from query expressions. We’ve got a new extension method on IEnumerable<T>
called GroupWithPipeline
, which takes a key selector (just like the normal GroupBy
) and what to do with the results of each grouping. Here’s the new code (which requires a using directive for MiscUtil.Linq.Extensions
):
var query = Voter.AllVoters()
.GroupWithPipeline(voter => voter.FavouriteColour.ToUpper(),
voters => voters.Count())
.Select(grouped =>
new { Colour = grouped.Key, Votes = grouped.Value });
foreach (var entry in query)
{
Console.WriteLine(“Colour {0} has {1} votes”, entry.Colour, entry.Votes);
}
How about making this a bit smarter now? Let’s try to also work out the minimum and maximum ages of the voters for each colour. Conceptually this is just a case of adding extra observers along with each vote counter in the “real life” model above. The code is remarkably simple:
var query = Voter.AllVoters()
.GroupWithPipeline(voter => voter.FavouriteColour.ToUpper(),
voters => voters.Count(),
voters => voters.Min(voter => voter.Age),
voters => voters.Max(voter => voter.Age))
.Select(grouped =>
new { Colour = grouped.Key,
Votes = grouped.Value1,
MinAge = grouped.Value2,
MaxAge = grouped.Value3});
foreach (var entry in query)
{
Console.WriteLine(“Colour {0} has {1} votes. Age range: {2}-{3}”, entry.Colour, entry.Votes, entry.MinAge, entry.MaxAge);
}
The fact that it uses “Value1”, “Value2” and “Value3” isn’t ideal, but unfortunately there’s no way round that as far as we’ve worked out – for this part.
2) Using DataProducer directly for multiple aggregates
GroupWithPipeline
uses a few types internally which you can use directly instead: DataProducer (implementing IDataProducer) and Future (implemeting IFuture). If I go into the details here, I’ll never get this posted – but that may come into another post if there’s enough interest. However, let’s have a look at how it can be used. First, let’s find the results of a few aggregates of our voters, this time without any groupings:
DataProducer<Voter> voters =
new DataProducer<Voter>();
IFuture<int> total = voters.Count();
IFuture<int> adults = voters.Count(voter => voter.Age >= 18);
IFuture<int> children = voters.Where(voter => voter.Age < 18).Count();
IFuture<int> youngest = voters.Min(voter => voter.Age);
IFuture<int> oldest = voters.Select(voter => voter.Age).Max();
voters.ProduceAndEnd(Voter.AllVoters());
Console.WriteLine(“Total voters: {0}”, total.Value);
Console.WriteLine(“Adult voters: {0}”, adults.Value);
Console.WriteLine(“Child voters: {0}”, children.Value);
Console.WriteLine(“Youngest vote age: {0}”, youngest.Value);
Console.WriteLine(“Oldest voter age: {0}”, oldest.Value);
The output of the code is what you’d expect, but there are a few things to note:
- Each aggregate returns an
IFuture<int>
instead of an int
. This is because we set up the aggregators before we produce any data. We need to use the Value
property to get the actual value back after we’ve produced the data.
- Just to hammer the point home, we must set up the aggregators (calling
Count
etc) before we produce the data (in ProduceAndEnd
). Otherwise the aggregators won’t have any data to work with.
- We can chain operators together (
Select
and then Max
, or Where
and then Count
) just as with normal LINQ.
- We’re applying multiple aggregates, but the data is only being produced once. This just can’t be done with normal LINQ.
ProduceAndEnd
takes an IEnumerable<T>
which could be another LINQ query – something fetching large amounts of data from files, etc. Everything will be streamed appropriately.
3) Using DataProducer in query expressions
This part wouldn’t have been available when I started writing this post. I hadn’t quite realised the power of the pattern yet.
By implementing GroupBy
on IDataProducer
, we can perform the original grouping in a query expression, in
a pretty normal kind of way… except that this time we can apply multiple aggregates, never buffering the data beyond the results of the
aggregation:
DataProducer<Voter> voters =
new DataProducer<Voter>();
var query = from voter in voters
group voter by voter.FavouriteColour.ToUpper() into grouped
select new { Colour = grouped.Key,
Votes = grouped.Count(),
MinAge = grouped.Min(voter => voter.Age),
MaxAge = grouped.Max(voter => voter.Age)};
var results = query.AsEnumerable();
voters.ProduceAndEnd(Voter.AllVoters());
foreach (var entry in results)
{
Console.WriteLine(“Colour {0} has {1} votes. Age range: {2}-{3}”,
entry.Colour, entry.Votes.Value,
entry.MinAge.Value, entry.MaxAge.Value);
}
There’s just one tricky bit in here – you must call AsEnumerable
before the data is produced, otherwise the aggregators will stream all their data with nothing watching for the results. In fact, AsEnumerable
builds a list internally – the final results are buffered, but only those results. There’s really not a lot that can be done about that.
So, there we go. That may or may not be a bit clearer now. I’m still learning both the power of the pattern, its potential uses, and the best ways of explaining it. Feedback is very welcome, both on the technical front and about the explanation. I’m absolutely convinced that it’s a useful pattern in some situations (though not all). All the code will be released as part of MiscUtil eventually, of course – we’re still tidying it up and producing a bit more documentation at the moment.