Not bad… for a human

Just replaced the HDD in my Alienware M15x with a 256 GB Crucial C300.

alienwareimage

If you’re interested the “physical” part of the upgrade took less than 20 minutes. There was a brief moment of panic when I discovered that the pins on the old drive were different to the new drive, however what looked like part of the old drive was actually a little adapter that could be fitted to the C300. Photos will explain better than I can.

image

I also took a photo of the Alienware’s internals if anyone is interested. Dual exhaust fans an heating pipes probably explains why it weighs so much Smile with tongue out

image

I’m also using this post as a test for Live Writer 2011. Seems pretty good so far.

Visual Studio 2010 is downloading…

Rx Schedulers – Reverse Trampoline

Recently I’ve been playing around with some *crazy* IScheduler implementations. I’m going to do a series of posts outlining some of my ideas. I’ve not had time to test or meditate on some of these concepts. If they turn out to be poorly conceived hair brained ideas – I’m sorry!

 

Some background first

If you are not familiar with the concept of a trampoline (aka message loop), it is a pattern where rather than functions being executed immediately, they are placed onto a queue and executed by another piece of code. In case you were not aware the “CurrentThreadScheduler” in the Reactive Framework uses this pattern. This post does a good job of summarizing it’s usefulness & role in Rx.

http://social.msdn.microsoft.com/Forums/en-US/rx/thread/7f75482f-eff2-4938-9491-47fe870989e8

Bart has a great post on “Stack Friendly Recursion”.

http://community.bartdesmet.net/blogs/bart/archive/2009/11/08/jumping-the-trampoline-in-c-stack-friendly-recursion.aspx

If you don’t yet “get it” just spend sometime trying to get your head around this mind bending code. The correct & expected output is;

A
C
B

 

class Program
{
    static void Main()
    {
        var tramp = new Trampoline();
        tramp.Invoke(() =>
        {
            Console.WriteLine("A");
            tramp.Invoke(() => Console.WriteLine("B"));
        });
        tramp.Invoke(() => Console.WriteLine("C"));
        tramp.Run();
    }
}

class Trampoline
{
    private readonly Queue<Action> queue = new Queue<Action>();

    public void Invoke(Action action)
    {
        queue.Enqueue(action);
    }

    public void Run()
    {
        while (queue.Count > 0)
        {
            queue.Dequeue()();
        }
    }
}

 

The problem

A very important part and and source of confusion for new comers to Rx, is that all notifications in the query pipeline are serialized. Events must occur in the correct order. This makes sense if you think about it, an enumerable must enumerate over elements in order as well. For the most part, this is not a concern to developers using Rx, however it definitely something you need to understand and be aware of. The complication comes arises however, when writing subjects or operators that allow scheduler injection. Consider this partial implementation of ISubject.

 

public class MySubject<T> : ISubject<T>
{
    private readonly IScheduler scheduler;
    private readonly List<IObserver<T>> observers = new List<IObserver<T>>();

    public MySubject(IScheduler scheduler)
    {
        this.scheduler = scheduler;
    }

    public void OnNext(T value)
    {
        lock(observers)
        {
            foreach(var o in observers)
            {
                scheduler.Schedule(() => o.OnNext(value)); // problem here
            }
        }
    }
}

 

The problem here is that when used with concurrent schedulers like the thread or task pool, events will be subtly reordered as pre-emptive multitasking suspends and resumes parallel threads execution. This defect might go completely unnoticed, and is difficult to test. It would be good if we have a testing strategy that would weed out these sorts of scheduler based race conditions.

 

The experiment

I don’t like writing multi-threaded unit tests. They are harder to write & difficult to understand. I avoid them where ever possible. This led me to design a single threaded scheduler, that deliberately processes it’s queue of work  out of order, allowing easy detection of these types of errors. Here are my “single threaded test schedulers”.

 

public interface ITestScheduler : IScheduler
{
    void Run();
}

public static class TestSchedulers
{
    public static ITestScheduler Trampoline = new NormalTrampoline();
    public static ITestScheduler Reverse = new ReverseTrampoline();
    public static IEnumerable<ITestScheduler> All
    {
        get
        {
            yield return Trampoline;
            yield return Reverse;
        }
    }

    private class NormalTrampoline : ITestScheduler
    {
        private readonly Queue<Action> queue = new Queue<Action>();

        public void Run()
        {
            while (queue.Count > 0)
            {
                queue.Dequeue()();
            }
        }

        public DateTimeOffset Now
        {
            get { throw new NotSupportedException(); }
        }

        public IDisposable Schedule(Action action, TimeSpan dueTime)
        {
            throw new NotSupportedException();
        }

        public IDisposable Schedule(Action action)
        {
            var disposable = new BooleanDisposable();
            queue.Enqueue(() =>
            {
                if (!disposable.IsDisposed) action();
            });
            return disposable;
        }
    }

    private class ReverseTrampoline : ITestScheduler
    {
        private readonly Stack<Action> stack = new Stack<Action>();

        public void Run()
        {
            while (stack.Count > 0)
            {
                stack.Pop()();
            }
        }

        public DateTimeOffset Now
        {
            get { throw new NotImplementedException(); }
        }

        public IDisposable Schedule(Action action, TimeSpan dueTime)
        {
            throw new NotImplementedException();
        }

        public IDisposable Schedule(Action action)
        {
            var disposable = new BooleanDisposable();
            stack.Push(() =>
            {
                if (!disposable.IsDisposed) action();
            });
            return disposable;
        }
    }
}

 

Take note of the reverse (stack based) trampoline scheduler. I would have liked to base these test schedulers on the TestScheduler & VirtualScheduler in the Rx framework, however VirtualScheduler is built on top of a queue, so I don’t think it would be possible to alter the order like this.

If we go back to our Subject example, we could now write a unit test like this;

 

void Test()
{
    foreach(var scheduler in TestSchedulers.All)
    {
        var subject = new Subject<int>(scheduler);
        var expected = Enumerable.Range(0, 10);
        var results = new List<int>();
        subject.Subscribe(results.Add);

        expected.Run(subject.OnNext);

        Assert.IsTrue(results.SequenceEqual(expected));
    }
}

This test would fail given our buggy naive subject implementation of that we defined earlier.

 

Conclusion / Coming up

I’ve not given this a good road test yet. I’d be very interested to  spark a discussion about this, especially if you can see problems with this approach.

Coming up, we will look at an easy way to build a subject so that it passes this test.

Does my cluster have quorum

I’m finally returning to my blog after a 2 month hiatus. In a previous series of posts, I was looking into various heart beating patterns in Rx. I generally don’t like “multi part blog series” as there is a pressure to finish them off and I have a tendency to get distracted.

1. http://enumeratethis.com/2010/10/19/heart-beats-keep-alives-rx/
2. http://enumeratethis.com/2010/10/21/refining-the-heat-beat-timeout/
3. http://enumeratethis.com/2010/10/22/monitoring-cluster-nodes-in-rx/
4. Does my cluster have quorum. (You’re looking at it)

Last time we ended up with a query that tells us which nodes in a cluster are currently sending us heart beats;

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main()
        {
            //..

            // query tells us which nodes are up or down.
            var query =
                from nodeId in source
                group nodeId by nodeId into grp
                from isConnected in IsConnected(grp, timeout)
                select new { nodeId = grp.Key, isConnected };

            //...
        }

        // takes a stream of heart beats & a timeout period.
        // transforms into an observable bool that tells us if we are connected or not.
        static IObservable<bool> IsConnected<T>(IObservable<T> heartbeats, TimeSpan timeout)
        {
            var connected = Observable.Return(true);
            var disconnected = Observable.Return(false).Delay(timeout);

            return Observable.Switch
            (
                from hb in heartbeats
                select connected.Concat(disconnected)
            ).DistinctUntilChanged();
        }
    }
}

 

 

However in my case, what I’m actually interested in is the clusters “quorum”;

Quorum – “The minimal number of officers and members of a committee or organization, usually a majority, who must be present for valid transaction of business.”

Let us assume that quorum >= 1/2 the nodes in the cluster. Really, we just need a reactive “counter”, that is incremented & decremented as nodes go on & offline. Scan is perfect for reactive counters.

query.Scan(0, (a, v) => v.isConnected ? a + 1 : a - 1)

 

We can then transform this into a reactive Boolean telling us if the cluster has quorum or not.

var hasQuorum =
    from count in query.Scan(0, (a, v) => v.isConnected ? a + 1 : a - 1)
    select count >= 2;

I hope you find this useful. Here is a complete sample program that covers everything discussed in this series. As a TODO I’d like to revisit all of this with the new join/group join/window operators in Rx, it’s possible they could be applied here.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main()
        {
            // timeout period
            var timeout = TimeSpan.FromSeconds(1);
            // test source
            var source = new Subject<string>();

            // query tells us which nodes are up or down.
            var query =
                from nodeId in source
                group nodeId by nodeId into grp
                from isConnected in IsConnected(grp, timeout)
                select new { nodeId = grp.Key, isConnected };

            query.Subscribe(Console.WriteLine);

            // query tells us if the cluster has quorum.
            var hasQuorum =
                from count in query.Scan(0, (a, v) => v.isConnected ? a + 1 : a - 1)
                select count >= 2;

            hasQuorum
                .DistinctUntilChanged()
                .Subscribe(b => Console.WriteLine("has quorum: " + b));

            source.OnNext("martyn");
            source.OnNext("warne");
            source.OnNext("ponting");

            Console.ReadLine();
        }

        // takes a stream of heart beats & a timeout period.
        // transforms into an observable bool that tells us if we are connected or not.
        static IObservable<bool> IsConnected<T>(IObservable<T> heartbeats, TimeSpan timeout)
        {
            var connected = Observable.Return(true);
            var disconnected = Observable.Return(false).Delay(timeout);

            return Observable.Switch
            (
                from hb in heartbeats
                select connected.Concat(disconnected)
            ).DistinctUntilChanged();
        }
    }
}

 

Posted in .NET, C#, LINQ, Observable, Rx. Tags: , , . Leave a Comment »
Follow

Get every new post delivered to your Inbox.