The IEnumerable<T> and IEnumerator<T> interfaces in .NET are interesting. They crop up an awful lot, but hardly anyone ever calls them directly – you almost always use a foreach loop to iterate over the collection. That hides all the calls to GetEnumerator(), MoveNext() and Current. Likewise iterator blocks hide the details when you want to implement the interfaces. However, sometimes details matter – such as for this recent Stack Overflow question. The question asks how to create a thread-safe iterator – one that can be called from multiple threads. This is not about iterating over a collection n times independently on n different threads – this is about iterating over a collection once without skipping or duplicating. Imagine it’s some set of jobs that we have to complete. We assume that the iterator itself is thread-safe to the extent that calls from different threads at different times, with intervening locks will be handled reasonably. This is reasonable – basically, so long as it isn’t going out of its way to be thread-hostile, we should be okay. We also assume that no-one is trying to write to the collection at the same time.
Sounds easy, right? Well, no… because the IEnumerator<T> interface has two members which we effectively want to call atomically. In particular, we don’t want the collection { “a”, “b” } to be iterated like this:
Thread 1 |
Thread 2 |
MoveNext() |
|
|
MoveNext() |
Current |
|
|
Current |
That way we’ll end up not processing the first item at all, and the second item twice.
There are two ways of approaching this problem. In both cases I’ve started with IEnumerable<T> for consistency, but in fact it’s IEnumerator<T> which is the interesting bit. In particular, we’re not going to be able to iterate over our result anyway, as each thread needs to have the same IEnumerator<T> – which it won’t do if each of them uses foreach (which calls GetEnumerator() to start with).
Fix the interface
First we’ll try to fix the interface to look how it should have looked to start with, at least from the point of view of atomicity. Here are the new interfaces:
public interface IAtomicEnumerable<T>
{
IAtomicEnumerator<T> GetEnumerator();
}
public interface IAtomicEnumerator<T>
{
bool TryMoveNext(out T nextValue);
}
One thing you may notice is that we’re not implementing IDisposable. That’s basically because it’s a pain to do so when you think about a multi-threaded environment. Indeed, it’s possibly one of the biggest arguments against something of this nature. At what point do you dispose? Just because one thread finished doesn’t mean that the rest of them have… don’t forget that “finish” might mean “an exception was thrown while processing the job, I’m bailing out”. You’d need some sort of co-ordinator to make sure that everyone is finished before you actually do any clean-up. Anyway, the nice thing about this being a blog post is we can ignore that little thorny issue :)
The important point is that we now have a single method in IAtomicEnumerator<T> – TryMoveNext, which works the way you’d expect it to. It atomically attempts to move to the next item, returns whether or not it succeeded, and sets an out parameter with the next value if it did succeed. Now there’s no chance of two threads using the method and stomping on each other’s values (unless they’re silly and use the same variable for the out parameter).
It’s reasonably easy to wrap the standard interfaces in order to implement this interface:
public sealed class AtomicEnumerable<T> : IAtomicEnumerable<T>
{
private readonly IEnumerable<T> original;
public AtomicEnumerable(IEnumerable<T> original)
{
this.original = original;
}
public IAtomicEnumerator<T> GetEnumerator()
{
return new AtomicEnumerator(original.GetEnumerator());
}
private sealed class AtomicEnumerator : IAtomicEnumerator<T>
{
private readonly IEnumerator<T> original;
private readonly object padlock = new object();
internal AtomicEnumerator(IEnumerator<T> original)
{
this.original = original;
}
public bool TryMoveNext(out T value)
{
lock (padlock)
{
bool hadNext = original.MoveNext();
value = hadNext ? original.Current : default(T);
return hadNext;
}
}
}
}
Just ignore the fact that I never dispose of the original IEnumerator<T> :)
We use a simple lock to make sure that MoveNext() and Current always happen together – that nothing else is going to call MoveNext() between our TryMoveNext() calling it, and it fetching the current value.
Obviously you’d need to write your own code to actually use this sort of iterator, but it would be quite simple:
T value;
while (iterator.TryMoveNext(out value))
{
}
However, you may already have code which wants to use an IEnumerator<T>. Let’s see what else we can do.
Using thread local variables to fake it
.NET 4.0 has a very useful type called ThreadLocal<T>. It does basically what you’d expect it to, with nice features such as being able to supply a delegate to be executed on each thread to provide the initial value. We can use a thread local to make sure that so long as we call both MoveNext() and Current atomically when we’re asked to move to the next element, we can get back the right value for Current later on. It has to be thread local because we’re sharing a single IEnumerator<T> across multiple threads – each needs its own separate storage.
This is also the approach we’d use if we wanted to wrap an IAtomicEnumerator<T> in an IEnumerator<T>, by the way. Here’s the code to do it:
public class ThreadSafeEnumerable<T> : IEnumerable<T>
{
private readonly IEnumerable<T> original;
public ThreadSafeEnumerable(IEnumerable<T> original)
{
this.original = original;
}
public IEnumerator<T> GetEnumerator()
{
return new ThreadSafeEnumerator(original.GetEnumerator());
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
private sealed class ThreadSafeEnumerator : IEnumerator<T>
{
private readonly IEnumerator<T> original;
private readonly object padlock = new object();
private readonly ThreadLocal<T> current = new ThreadLocal<T>();
internal ThreadSafeEnumerator(IEnumerator<T> original)
{
this.original = original;
}
public bool MoveNext()
{
lock (padlock)
{
bool ret = original.MoveNext();
if (ret)
{
current.Value = original.Current;
}
return ret;
}
}
public T Current
{
get { return current.Value; }
}
public void Dispose()
{
original.Dispose();
current.Dispose();
}
object IEnumerator.Current
{
get { return Current; }
}
public void Reset()
{
throw new NotSupportedException();
}
}
}
I’m going to say it one last time – we’re broken when it comes to disposal. There’s no way of safely disposing of the original iterator at “just the right time” when everyone’s finished with it. Oh well.
Other than that, it’s quite simple. This code has the serendipitous property of actually implementing IEnumerator<T> slightly better than C#-compiler-generated implementations from iterator blocks – if you call the Current property without having called MoveNext(), this will throw an InvalidOperationException, just as the documentation says it should. (It doesn’t do the same at the end, admittedly, but that’s fixable if we really wanted to be pedantic.
Conclusion
I found this an intriguing little problem. I think there are better ways of solving the bigger picture – a co-ordinator which takes care of disposing exactly once, and which possibly mediates the original iterator etc is probably the way forward… but I enjoyed thinking about the nitty gritty.
Generally speaking, I prefer the first of these approaches. Thread local variables always feel like a bit of a grotty hack to me – they can be useful, but it’s better to avoid them if you can. It’s interesting to see how an interface can be inherently thread-friendly or not.
One last word of warning – this code is completely untested. It builds, and I can’t immediately see why it wouldn’t work, but I’m making no guarantees…