I think being a technology that enables us to easily write multithreaded, concurrent applications, a question often asked is;
“how fast is this stuff?”
As Rx completes its metamorphosis from “labs project” to “official product”, it seems the team has started focusing on performance. This is probably quite normal as the project shifts from designing the perfect API to supporting a stable product.
I thought I’d share some benchmarks on where some of these improvements have been made.
Subjects
Subjects are used everywhere in Rx, so its a good candidate for optimisation, giving us across the board performance gains. I’m testing the throughput by publishing 1 million notifications.
// Subject thoughput performance test
var subject = new Subject<int>();
subject.Subscribe(_ => { });
var sw = Stopwatch.StartNew();
for (int i = 0; i < 1000000; i++)
{
subject.OnNext(i);
}
Console.WriteLine(sw.Elapsed);
Results
1.0.2698.104 – 2.38 seconds
1.0.2856.104 (FastSubject) – 0.12 seconds
1.0 Stable – 0.04 seconds

The performance increase is due to two factors. As Wes & Bart explained in their latest video, they have removed the responsibility of scheduling from the subject implementations. This accounts for the improvement between 2698 & the 2856 FastSubject. The next improvement is due to the reduction in memory allocation. Subject’s used to make a copy of the observers array, before exiting the lock and publishing the notification to all the observers. This has been replaced with an immutable list that can be safely access outside of the lock. This immutable list is replaced wholesale when observers subscribe/unsubscribe. Throughput is more important than the subscribe & unsubscribe operations. This make sense if you think about it, its really just how multicast delegate work. Hmmm, more improvements to come?
Schedulers
Also discussed in the video, are the changes to the IScheduler interface. I’m testing the scenario where we want to adapt and enumerable into an observable using the NewThreadScheduler.
var sw = Stopwatch.StartNew();
Enumerable.Range(1, 10000)
.ToObservable(Scheduler.NewThread)
.Count()
.Single();
Console.WriteLine(sw.Elapsed);
Results
1.0.2856.104 – 3.43 seconds
v1.0 Stable – 0.14 seconds

The reason for this gain is pretty simple. The old scheduling API didn’t provide the action that was being executed any contextual information about the thread is was running on. This meant that for NewThreadScheduler to work properly, ToObservable would have had to been hard wired to create a dedicated event loop when the caller subscribes. But that would fly against the whole idea of parameterising concurrency. The IScheduler interface has been overhauled to accommodate scenarios like this.
Was
public interface IScheduler
{
IDisposable Schedule(Action action);
IDisposable Schedule(Action action, TimeSpan dueTime);
DateTimeOffset Now { get; }
}
Now
public interface IScheduler
{
IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action);
IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action);
IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action);
DateTimeOffset Now { get; }
}
Don’t panic, you probably won’t ever need to implement this guy, and extension methods allow you to keep using the old API (which I really like!), for example;
Scheduler.NewThread.Scheduler(() => DoStuff());
ObserveOn
This wasn’t discussed in the video but I thought it was worth mentioning. ObserverOn has been redesigned so that it makes use of the SchedulerObserver<T>. I always found it strange that this serialization code was duplicated. They’ve addressed that. Shame SchedulerObserver<T> has not been made public though. I think it would be a useful building block for people building their own operators. Anyway, the test code;
var sw = Stopwatch.StartNew();
Observable.Range(1,10000)
.ObserveOn(Scheduler.ThreadPool)
.Count()
.Single();
Console.WriteLine(sw.Elapsed);
Results
1.0.2856.104 – 0.8 seconds
v1.0 Stable – 0.2 seconds

I can’t help but feel that there is still more we can squeeze out of ObserveOn. I’ve been playing around with some alternate APIs for this. They will probably find their way into Rxx shortly. More on this soon.
Conclusion
The last few builds have yielded some great improvements, an I’m sure there are more to come. If you run into any performance issues I’d encourage you to hit to forums. More soon.