Asynchronous Commands in Metro, WPF & Silverlight

I’ve seen quite a few examples demonstrating the new async/await language features (C# 5 & VB next) with button click events;

private async void button1_Click(object sender, RoutedEventArgs e)
{
    string url = "http://reedcopsey.com";
    string content = await new WebClient().DownloadStringTaskAsync(url);
    this.textBox1.Text = string.Format("Page {0} supports XHTML 1.0: {1}",
      url, content.Contains("XHTML 1.0"));
}

If you are using an architectural pattern like MVVM it’s unlikely that you’re writing code like this. In WPF, Silverlight & Metro you can bind buttons directly to an object implementing the ICommand interface.

// WPF ICommand interface
public interface ICommand
{
    /// <summary>
    /// Defines the method to be called when the command is invoked.
    /// </summary>
    /// <param name="parameter">Data used by the command.
    /// If the command does not require data to be passed, this object can be set to null.</param>
    void Execute(object parameter);

    /// <summary>
    /// Defines the method that determines whether the command can execute in its current state.
    /// </summary>
    /// 
    /// <returns>
    /// true if this command can be executed; otherwise, false.
    /// </returns>
    /// <param name="parameter">Data used by the command.
    /// If the command does not require data to be passed, this object can be set to null.</param>
    bool CanExecute(object parameter);

    /// <summary>
    /// Occurs when changes occur that affect whether or not the command should execute.
    /// </summary>
    event EventHandler CanExecuteChanged;
}

The nice thing about commands vs. a simple click event is that they encapsulate the logic informing the button wether or not it can be executed. This is particularly useful when we start talking about asynchronous operations as we might like to disable the button while the asynchronous request is in flight.

Example

    public bool CanExecute(object parameter)
    {
        return !isExecuting;
    }

    public async void Execute(object parameter)
    {
        isExecuting = true;
        OnCanExecuteChanged();
        try
        {
            // await some asynchronous operation
        }
        finally
        {
            isExecuting = false;
            OnCanExecuteChanged();
        }
    }

What About Errors?

Note that commands are generally executed by the UI frameworks message loop, meaning that any unhandled exceptions will be posted onto the relevant synchronisation context.

AsyncCommand

This pattern is easily captured in a reusable object that we can use to build all our asynchronous commands.

    // a reusable asynchronous command
    public class AsyncCommand : ICommand
    {
        private readonly Func<Task> execute;
        private readonly Func<bool> canExecute;
        private bool isExecuting;

        public AsyncCommand(Func<Task> execute) : this(execute, () => true) { }

        public AsyncCommand(Func<Task> execute, Func<bool> canExecute)
        {
            this.execute = execute;
            this.canExecute = canExecute;
        }

        public bool CanExecute(object parameter)
        {
            // if the command is not executing, execute the users' can execute logic
            return !isExecuting && canExecute();
        }

        public event EventHandler CanExecuteChanged;

        public async void Execute(object parameter)
        {
            // tell the button that we're now executing...
            isExecuting = true;
            OnCanExecuteChanged();
            try
            {
                // execute user code
                await execute();
            }
            finally
            {
                // tell the button we're done
                isExecuting = false;
                OnCanExecuteChanged();
            }
        }

        protected virtual void OnCanExecuteChanged()
        {
            if (CanExecuteChanged != null) CanExecuteChanged(this, new EventArgs());
        }
    }

Usage

In your view model you can now create asynchronous commands like this;

// example command, simulate an operation that takes 2 seconds.
new AsyncCommand(() => TaskEx.Delay(2000));

// example command, with some custom can execute logic
new AsyncCommand(() => TaskEx.Delay(2000), () => IsValidInput());

 

Memory Leaks

A word of warning… If your command object’s lifetime extends beyond that of the UI element (Button) that is subscribing to the CanExecuted event you should implement a weak event pattern in here. I think that is outside the scope of this article. I’ll follow up shortly.

In Conclusion

This is a great example of why async void methods are required in C#. Commands are like a bridge between synchronous UI elements like buttons and your view models asynchronous operations like web requests. Enjoy!

Reactive Extensions 2.0 Beta: Assembly References

Before you jump into Rx 2.0 Beta you should know about some of the changes to the hierarchy of assemblies.

Rx 1.0 assemblies were structured like this;

image

 

Rx 2.0 assemblies are structured like this;

image

The big change here is the addition of the System.Reactive.Interfaces & System.Reactive.PlatformServices.

I’m guessing that the thinking behind this change but;

System.Reactive.Interfaces

This will allow people to define “service contracts” without bringing in the entire Rx stack.

System.Reactive.PlatformServices

This eliminates a hard dependency on platform specific scheduling, concurrency & timing including low level components such as the thread pool & high resolution timers.

More soon
James

Hostile to friendly type names

Have you ever written some code that dumps type names using reflection and run into annoyances like this?

Query
new[]
{
    typeof(List<int>),
    typeof(List<List<int>>),
    typeof(int?),
    typeof(bool),
}.Select(t => t.Name).Dump("hostile type names");
Output

image

 

This can be problematic if you are trying to generate C# code via T4 templates or something similar.

A colleague and I recently solved this problem using the C# CodeDom (handy extension method included).

Query
new[]
{
    typeof(List<int>),
    typeof(List<List<int>>),
    typeof(int?),
    typeof(bool),
}
.Select(t => new
{    
    HostileName = t.Name,
    FriendlyName = t.GetFriendlyName()
})
.Dump("hostile -> friendly type names");
Output

image

 
Extension method
public static class TypeEx
{
    public static string GetFriendlyName(this Type t)
    {
        using (var provider = new CSharpCodeProvider())
        { 
            var typeRef = new CodeTypeReference(t);
            return provider.GetTypeOutput(typeRef);
        }
    }
}
Posted in .NET, C#. 4 Comments »

Structs that implement IEnumerator

OK. Lets just revise the query we had in the previous post. I’m going to change it so that is finds enumerators that are also value types;

from f in Directory.GetFiles (
       Path.Combine (
             Environment.GetFolderPath (System.Environment.SpecialFolder.ProgramFilesX86),
             @"Reference Assemblies\Microsoft\Framework\.NETFramework\v4.0"),
       "*.dll")
where !f.ToLowerInvariant().Contains ("thunk")
where !f.ToLowerInvariant().Contains ("wrapper")
select Assembly.LoadFrom (f) into a
from t in a.GetExportedTypes()
where t.IsValueType && typeof(IEnumerator).IsAssignableFrom (t)
select new { a.GetName().Name, t.FullName }

Running the query in LINQPad will yield the following results.

image

It would seem like a CRAZY! thing to do given that an enumerator, is by its very nature, mutable… I’ll try to reason why this might be beneficial; then we will look at why it sucks.

Possible Reasoning

I can think of two possible reasons why they might have chosen to do this.

1. It could be considered a “feature” in that if you ever wanted to remember or save the current state of a collection’s enumerator, you could just copy it to another field. I’m going to rule this out, the behaviour isn’t consistent with other enumerators (arrays, non-generic collections and then later, iterator blocks).

2. Another possibility is performance. Perhaps the BCL team were looking to avoid the heap allocation caused by calling GetEnumerator.

It would seem like a “micro optimisation”; does that tiny heap allocation really cause an issue? I’m going to look at a popular design pattern that I think might expose the problem they were trying to solve.

The Composite Design Pattern

Consider a C# implementation of the Composite pattern;

// Composite design pattern
class Node
{
    public readonly ArrayList Children = new ArrayList();

    public void Recurse()
    {
        foreach(Node child in Children)
        {
            child.Recurse();
        }
    }
}

Now lets consider a simple usage (1000 nodes with 1000 children).

var root = new Node();
for (int i = 0; i < 1000; i++)
{
    var child = new Node();
    for (int j = 0; j < 1000; j++)
    {
        child.Children.Add(new Node());
    }
    root.Children.Add(child);
}

var before = GC.CollectionCount(0);
root.Recurse();
var after = GC.CollectionCount(0);
Console.WriteLine("Collection Count: " + (after - before));

OUTPUT

image

That’s right 7 GCs just to recurse our composite tree structure! Yikes!

If we change from ArrayList to List<object>, you can see the difference.

image

Is this a realistic scenario? Can anyone think of a LARGE instance of the composite design pattern present in many of today’s .NET applications? I’ll give you a clue, it starts with W… and ends in PF.

WPF & Silverlight are text book implementations of the composite design pattern; can you imagine how many times the tree is traversed in this manner? Can you imagine how deep the tree is for a complex user interface? In my mind, this is a good theory for explaining why, at the very least the “presentation core” enumerators have been implemented in this manner. I think most developers would have implemented a composite design pattern using List<T> at some stage or another.

Possible Problems

So is it likely that someone would trip over this optimisation? Lets start with the obvious ones before moving to hell.

List<int>.Enumerator e1 = new List<int>{1,2,3,4,5}.GetEnumerator();
List<int>.Enumerator e2 = e1;
e1.MoveNext();
Console.WriteLine(e1.Current);
Console.WriteLine(e2.Current);

This will output 1 & 0. The team probably decided to take this hit, after all any normal person would write this.

IEnumerator<int> e1 = new List<int>{1,2,3,4,5}.GetEnumerator();
IEnumerator<int> e2 = e1;
e1.MoveNext();
Console.WriteLine(e1.Current);
Console.WriteLine(e2.Current);

This causes the value type to be boxed, meaning the “copy by value” semantics disappear and we get two references to the same enumerator, giving us the expected output of 1 & 1. But wait a second, most developers in .NET 3.5 / C# 3.0 will write this;

var e1 = new List<int>{1,2,3,4,5}.GetEnumerator();
var e2 = e1;
e1.MoveNext();
Console.WriteLine(e1.Current);
Console.WriteLine(e2.Current);

Uh oh! GetEnumerator returns List<int>.Enumerator meaning the inferred type is the value type, we’re back to square one!

I guess you could argue, well at least the problem is localised to this statement; it isn’t like people are passing these around from one method to another.

Oh wait, but what about the interference of generic type parameters! This is heavily leveraged by technologies like LINQ & Rx (NOTE: This is how I came up with my recent pop quiz!).

Consider the following;

    public class Wrapper<T> where T : IEnumerator
    {
        private readonly T enumerator;

        public Wrapper(T enumerator)
        {
            this.enumerator = enumerator;
        }

        public object Current
        {
            get { return enumerator.Current; }
        }

        public bool MoveNext()
        {
            return enumerator.MoveNext();
        }
    }

 

Impromptu Pop Quiz

I don’t want to box the enumerator… we only have one storage location.

private readonly T enumerator;

Will this wrapper work for enumerators that are value types?

Here is a complete program for you to play with.

using System;
using System.Collections.Generic;
using System.Collections;

namespace ConsoleApplication30
{
    class Program
    {
        private static void Main()
        {
            var w = Wrapper.Create(new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }.GetEnumerator());
            w.MoveNext();
            Console.WriteLine("Expected value: 1");
            Console.WriteLine("Actual value: " + w.Current);
        }
    }

    public static class Wrapper
    {
        public static Wrapper<T> Create<T>(T enumerator)
            where T : IEnumerator
        {
            return new Wrapper<T>(enumerator);
        }
    }

    public class Wrapper<T> where T : IEnumerator
    {
        private readonly T enumerator;

        public Wrapper(T enumerator)
        {
            this.enumerator = enumerator;
        }

        public object Current
        {
            get { return enumerator.Current; }
        }

        public bool MoveNext()
        {
            return enumerator.MoveNext();
        }
    }
}

Look forward to your replies!

Financial Charts & Reactive Extensions

 

Introduction

The price of a financial instrument is not a fixed value. It is something that changes over time, driven by market forces. It’s quite common to view this as a chart that is updated as the price changes. In this post I will demonstrate how to sample an observable stream of prices and display the results in a real-time charting application.

To start with, lets consider a basic chart (aka tick chart), where we simply plot each movement in the underlying instrument.

image

This is fine when looking at small samples of data, but for volatile instruments, we might receive many prices per second. This can quickly become unwieldy. The computer might not have enough memory to plot days, months or years of prices. This will also show too much noise to the user, with constant price fluctuations that do not represent the sentiment of the market.

We need to sample the data in some way. The simplest approach is just to take the latest price at regular intervals. Rx has an operator that behaves in the exact manner – Sample.

image

This solution isn’t really suitable for technical analysis as we are diluting the information to an extent that it is difficult to deduce the overall market trend. Unfortunately analysts and traders are quite fussy! What they need is a chart that represents price performance for a specific period in a single point on the x-axis.

 

OHLC Charts

I’ve chosen this particular type of chart as it relatively simple. All we need to do is partition our data into regular time periods and determine four values;

image

We can then plot these quadruples as a price bar or candlestick. A “price bar” is a line representing the highest & lowest prices over a period of time, with “tick marks” showing the opening and closing price.

image

The candlesticks I mentioned are very similar, however the body is shaded to signify a positive/negative movement.

image

Lets see what our original chart would like like using price bars.

image

And now we can remove the original points…

image

I hope it’s clear how that was done. Additionally, it’s quite common to colour the bars so that a negative movements, between the open and close, are in red and positive movements are in a happy colour like blue or green. As I mentioned earlier this is essential for the candlestick variant of the chart. Ultimately we want to be able to build applications that plots real-time data like this;

image

image

Now that you know what we are building, lets work out how the Rx code is going to work. Hold on to your hat!

 

Generate

First up, we’re going to need a stream of observable prices to play with. I’d like my price stream to start at $5. Then every 10th of a second, I’d like to apply a random adjustment to the price.

var rand = new Random();

var prices = Observable.Generate(
    5d,
    i => i > 0, 
    i => i + rand.NextDouble() - 0.5,
    i => i,
    i => TimeSpan.FromSeconds(0.1)
);

Woah! Lets go through this slowly.

  • The first parameter (5d) is the initial seed or starting price for our observable sequence.
  • Then we have a break condition (i > 0). We are effectively saying, keep generating prices while the price is greater than zero. If the price drops to zero we can assume that the company is bankrupt.
  • Then we define (i + rand.NextDouble() – 0.5) how we’d like the price to be incremented, by a random value between –0.5 & 0.5.
  • You can then optionally transform the notification, we are not using this feature (i => i).
  • Finally we can supply a time interval TimeSpan.FromSeconds(0.1), telling Generate when we’d like to yield the next iteration in our observable sequence. We could randomize this as well, I’m just going to generate a new price every 0.1 seconds.

Essentially Generate is a parameterised factory method that allows you to “corecursively” define an observable sequence. The function parameters passed into the method define the behaviour (the single next step) of the observable sequence, much like a mathematical series. In functional programming this concept is known as “anamorphism” or “unfold”. Generate is anamorphism for observables sequences!

If you drop the code in LINQPad, you should see an output like this.

image

If you don’t have LINQPad, you could write a small console application to test this out.

 

Buffer

So now that we have some test data to work with lets look at how we can calculate the Open, High, Low & Close (OHLC) prices. Last August I did this using Rx’s Buffer operator. Reactive Extensions has evolved significantly since then. Here is a quote from my old post.

“Rx handles this type of problem perfectly via the “Buffer” operator.”

We had some code similar to this;

from buffer in prices.Buffer(TimeSpan.FromSeconds(1))
select new
{
    Open = buffer.First(),
    High = buffer.Max(),
    Low = buffer.Min(),
    Close = buffer.Last()
}

You can drop this in LINQPad with our test data.

image

This works, however in retrospect, “perfectly” was a little far from the truth. Lets take a close look at how buffer works…

As we subscribe to the query, the buffer operator will create a list. All notifications from the underlying source will be placed into this list until the timer elapses. At this point, the observer will be given the list containing all the notifications. We can then interact with this list, in this example, to calculate our OHLC values. This might help you understand how the buffer operator works. I’d recommend pasting this code into LINQPad and having a play around with it.

Code

var source = new Subject<char>();
var timer = Observable.Interval(TimeSpan.FromSeconds(1));
var buffer = source.Buffer(() => timer);
buffer.Subscribe(Console.WriteLine);

source.OnNext('a');
source.OnNext('b');
source.OnNext('c');

Thread.Sleep(1100);

source.OnNext('d');
source.OnNext('e');
source.OnNext('f');
source.OnCompleted();

Pictures

Marble Diagram: Buffer

This approach is fine when we have three notifications per 1 second interval, but what will happen if we have a volatile price and say maybe a 1 minute interval? Our buffer is completely unbounded. This could lead to our short lived notifications being elevated into the generation 2 heap. If the buffer gets big enough, this could even result in a large object heap allocation.

Buffer has its uses but in this case, our open, high, low & close computation, doesn’t need all the values at once. We could perform the same calculation by stashing the first and last values (open & close) and tracking the min & max (high and low) values over the specified interval. What we need is an operator that allows us to react to the values over a window of time.

 

Window

Last December the Rx Team gave us a Christmas present focused around “Programming Streams of Coincidence”. There are some pretty powerful tools in there. If you are interested Lee Campbell has a post covering this family of operators. For this problem, we are interested in Window, which really is perfect for calculating OHLC over an observable stream of prices. Buffer & Window have a few different overloads, but these are the two we are talking about here;

IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan)
IObservable<IObservable<T>> Window<T>(this IObservable<T> source, TimeSpan timeSpan)

So how do these two operators differ? Buffer creates a list at each window opening and passes it to the observer when the window closes. In contrast the Window operator creates a subject at each window opening and passes it to the observer immediately. The subject acts as a conduit, allowing the operator to pipe each notification to the observer until the window closes. Here is the Buffer marble diagram again, this time along side Window;

Marble Diagram: Buffer VS. Window

Window is more powerful than Buffer as the observer can decide how to process the notifications within the context of the window. This can be achieved by applying SelectMany to the Window operator.

Query Comprehension Syntax

from window in source.Window(timeSpan)
from ? in window.?
select ?

Lambda Syntax

source.Window(timeSpan).SelectMany(window => ?, (x, ?) => ?)

Pictures

Marble Diagram: Window and SelectMany

Interestingly the Buffer overload we’ve been discussing, is actually implemented using this technique. Lets use this as an example;

Query Comprehension Syntax

// Implement Buffer using Window
public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan)
{
    // Transform IObservable<IObservable<T>> -> IObservable<IList<T>>
    return 
        from window in source.Window(timeSpan)
        from buffer in window.ToList()
        select buffer;
}

Lambda Syntax

source.Window(timeSpan).SelectMany(window => window.ToList())

Pictures

Marble Diagram: Buffer Implemented Using Window

 

You can write any query you want over the window observable. Maybe the observer is only interested in the last value, this is semantically equivalent to Sample;

Query Comprehension Syntax

// Implement Sample using Window
public static IObservable<T> MySample<T>(this IObservable<T> source, TimeSpan timeSpan)
{
    return
        from window in source.Window(timeSpan)
        from last in window.TakeLast(1)
        select last;
}

Lambda Syntax

source.Window(timeSpan).SelectMany(window => window.TakeLast(1))

Pictures

Marble Diagram: Sample Implemented Using Window

 

Aggregate

OK so we have a mechanism that effectively divides a stream of events into smaller streams of events based on windows of time. All we need is a query that we can run over each observable window. Remember Buffer was implemented using ToList? Well ToList is actually implemented using Aggregate!

In fact all sorts of things can be implemented using Aggregate. Remember the Generate (anamorphism) method we used to create our test data? Well it’s actually the dual to Aggregate! While Generate takes a seed and applies some functions to produce a series of notifications, Aggregate takes a series of notifications and recursively applies an accumulator function, until it reaches the end of the sequence, at which point it yields a result. Aggregate is “catamorphism or fold” for observable sequences!

Mathmatical Duals: Aggregate, Generate, Catamorphism, Anamorphism, Fold, Unfold

 

We are going to use Aggregate to compute the OHLC values for each Window of prices. Before we do that lets just make sure everyone is comfortable.

Aggregate Method Signature

IObservable<TAccumulate> Aggregate<TSource, TAccumulate>(
    this IObservable<TSource> source,
    TAccumulate seed,
    Func<TAccumulate, TSource, TAccumulate> accumulator)

Marble Diagram

Marble Diagram: Aggregate

Confused? Scared? Don’t be…

Lets look at an example. Here is Sum implemented using Aggregate.

Sum

Observable.Range(1,3).Aggregate(0, (sum, value) => sum + value)

image

As I mentioned earlier, ToList is also implemented using Aggregate.

ToList

Observable.Range(1,3).Aggregate(new List<int>(), (list, value) =>
{
    list.Add(value);
    return list;
})

Marble Diagram: ToList

If you’ve not grokked it, try implementing some other basic operators like Min & Max.

 

OHLC (Window + SelectMany + Aggregate)

So can we use Aggregate for our OHLC calculations? Lets start with the easy stuff by defining the fields we’ll need to keep track of these four values.

class OHLC
{
    public double? Open;
    public double? High;
    public double? Low;
    public double Close;
}

Now we just need a function that takes the current values & a prices and produces the new values.

// (TAccumulate, TSource) -> TAccumulate
static OHLC Accumulate(OHLC state, double price)
{
    // Take the current values & apply the price update.    
    state.Open = state.Open ?? price;
    state.High = state.High.HasValue ? state.High > price ? state.High : price : price;
    state.Low = state.Low.HasValue ? state.Low < price ? state.Low : price : price;
    state.Close = price;
    return state;
}

If we bring this together with Window, SelectMany & Aggregate, we’ve now got a query that takes a stream of prices, splits it into windows and calculates OHLC values.

from window in prices.Window(TimeSpan.FromSeconds(1))
from result in window.Aggregate(new OHLC(), Accumulate)
select result

image

 

For readers who are not using LINQPad, I’ve written a console application. You can download it here;

Console Application Code

using System;

namespace ConsoleApplication124
{
    using System.Reactive.Linq;

    class Program
    {
        static void Main()
        {
            var rand = new Random();
            var prices = Observable.Generate(
                5d, i => i > 0, i => i + rand.NextDouble() - 0.5, i => i, i => TimeSpan.FromSeconds(0.1));

            var query = from window in prices.Window(TimeSpan.FromSeconds(1))
                        from result in window.Aggregate(new Ohlc(), Accumulate)
                        select result;
            query.Subscribe(Console.WriteLine);
            Console.ReadLine();
        }

        class Ohlc
        {
            public double? Open;
            public double? High;
            public double? Low;
            public double Close;

            public override string ToString()
            {
                return (new { Open, High, Low, Close }).ToString();
            }
        }

        static Ohlc Accumulate(Ohlc current, double price)
        {
            current.Open = current.Open ?? price;
            current.High = current.High.HasValue ? current.High > price ? current.High : price : price;
            current.Low = current.Low.HasValue ? current.Low < price ? current.Low : price : price;
            current.Close = price;
            return current;
        }
    }
}

Plotting The Results

Finally, we’re going to plot these results using the charting controls that come with .NET 4.0. Unfortunately/Strangely these controls are only available for Windows Forms developers. It appears that they will be available in a future version of WPF, there is a preview release available here. Alternatively there are lots of 3rd party charting packages that offer similar functionality. I’ll try these out in a future post. For now I’m going to use the Windows Forms controls, conceptually there shouldn’t be much of a difference.

First lets prepare the project.

1. Create a new Windows Forms project.

image

4. Add references to Reactive Extensions.

image

Your references should now look something like this;

image

3. Drop a Chart control onto the form.

image

4. I’m going to get rid of the Legend.

image

Click remove.

image

 

The Code

In the introduction I talked about two different chart types. Both are supported by this control library.

series.ChartType = SeriesChartType.Candlestick;
series.ChartType = SeriesChartType.Stock;

For testing purposes, I’m going to set the resolution of the time axis so that it will work with 1 second intervals. A window that small isn’t useful in the real world, the smallest I’ve seen is 1 minute, but we want to see some results straight away.

series.XValueType = ChartValueType.Time;
var area = chart1.ChartAreas[0];
area.Axes[0].Title = "Time";
area.AxisX.LabelStyle.IntervalType = DateTimeIntervalType.Seconds;
area.AxisX.LabelStyle.Format = "T";

Finally we need to subscribe to our query & test data and populate the chart.

query.ObserveOn(this).Subscribe(x => series.Points.AddXY(DateTime.Now, x.High, x.Low, x.Open, x.Close));

If you put all this code in the form’s constructor and run the project, you should have something like this;

using System;
using System.Windows.Forms;
using System.Reactive.Linq;
using System.Windows.Forms.DataVisualization.Charting;

namespace Chart
{
    public partial class Form1 : Form
    {

        public Form1()
        {
            InitializeComponent();

            // Configure the chart
            var series = chart1.Series[0];
            series.ChartType = SeriesChartType.Candlestick;
            series.XValueType = ChartValueType.Time;
            var area = chart1.ChartAreas[0];
            area.Axes[0].Title = "Time";
            area.AxisX.LabelStyle.IntervalType = DateTimeIntervalType.Seconds;
            area.AxisX.LabelStyle.Format = "T";

            
            // Test prices
            var rand = new Random();
            var prices = Observable.Generate(5d, i => i > 0, i => i + rand.NextDouble() - 0.5, i => i, i => TimeSpan.FromSeconds(0.1));

            // OHLC query
            var query =
                from window in prices.Window(TimeSpan.FromSeconds(1))
                from ohlc in window.Aggregate(new OHLC(), Accumulate)
                select ohlc;

            // Subscribe & display results
            query.ObserveOn(this).Subscribe(x => series.Points.AddXY(DateTime.Now, x.High, x.Low, x.Open, x.Close));

        }

        class OHLC
        {
            public double? Open;
            public double? High;
            public double? Low;
            public double Close;
        }

        static OHLC Accumulate(OHLC current, double price)
        {
            current.Open = current.Open ?? price;
            current.High = current.High.HasValue ? current.High > price ? current.High : price : price;
            current.Low = current.Low.HasValue ? current.Low < price ? current.Low : price : price;
            current.Close = price;
            return current;
        }
    }
}

Run the application;

image

 

Additionally we can apply a suitable colour scheme to the chart;

// Colours
chart1.BackColor = Color.Black;
chart1.ChartAreas[0].Axes[0].LineColor = Color.LimeGreen;
chart1.ChartAreas[0].Axes[0].TitleForeColor = Color.LimeGreen;
chart1.ChartAreas[0].AxisX.MajorTickMark.LineColor = Color.LimeGreen;
chart1.ChartAreas[0].AxisX.LabelStyle.ForeColor = Color.LimeGreen;
chart1.ChartAreas[0].Axes[1].LineColor = Color.LimeGreen;
chart1.ChartAreas[0].Axes[1].TitleForeColor = Color.LimeGreen;
chart1.ChartAreas[0].AxisY.MajorTickMark.LineColor = Color.LimeGreen;
chart1.ChartAreas[0].AxisY.LabelStyle.ForeColor = Color.LimeGreen;
chart1.ChartAreas[0].BackColor = Color.Black;
series["PriceDownColor"] = "Red";

image

 

Download

You can get a working demo application here. As discussed I will provide a WPF based solution soon.

I’ve tried to make this article useful for both readers that know Rx but nothing about finance, or know finance but are not familiar with Rx. I’m not sure if it really works so I’m interested to hear feedback from both camps.

More soon.

New Rxx Release (It’s a big one!)

We couldn’t let Rx go live without an Rxx refresh and boy is it a big one! Dave Sexton & I are proud to announce Rxx 1.1. Firstly let me congratulate Dave. This is a major milestone for us and he has put in a huge effort, much of which has been enhancing our build process. Rxx is now available in the following configurations!

Build Configurations

  • .NET Stable
  • .NET Experimental
  • Silverlight Stable
  • Silverlight Experimental
  • Windows Phone 7 Stable
  • Windows Phone 7 Experimental

Dave has spent the last month mirroring the Rx team’s build process, including documentation & labs, ported to their respective platforms. I guess we’re going to have to support XNA next! Of course there would be no point in doing all this if we didn’t have some useful features. Don’t worry there are plenty of new ones.

Major New Features

  • Rx Parsers
  • Rx Dns
  • Rx Smtp Clients
  • Rx Sockets
  • Rx Web Requests
  • Rx Network Changes
  • Rx Ping
  • A new type of Multicasting that allows state to be cleared when the observable goes “cold”.

Technologies

Along with out new build process, we’ve incorporated a range of technologies to ensure that Rxx remains a high quality project;

  • Code Contracts
  • StyleCop Analysis
  • FxCop Analysis
  • Sandcastle Documentation
  • MS Test

Additionally Dave Sexton’s Labs abstraction means that our interactive labs run on each platform.

What’s next?

I’m really proud to be part of a project that is shaping up to be one of the most professional looking open source projects I’ve seen. The work that’s gone into our project in this release gives us a great platform to build on going forward. Head over to the project page & as always, we are eagerly awaiting any feedback or ideas you might have.

Links

Rx Performance Improvements

I think being a technology that enables us to easily write multithreaded, concurrent applications, a question often asked is;

“how fast is this stuff?”

As Rx completes its metamorphosis from “labs project” to “official product”, it seems the team has started focusing on performance. This is probably quite normal as the project shifts from designing the perfect API to supporting a stable product.

I thought I’d share some benchmarks on where some of these improvements have been made.

 

Subjects

Subjects are used everywhere in Rx, so its a good candidate for optimisation, giving us across the board performance gains. I’m testing the throughput by publishing 1 million notifications.

// Subject thoughput performance test
var subject = new Subject<int>();
subject.Subscribe(_ => { });
var sw = Stopwatch.StartNew();
for (int i = 0; i < 1000000; i++)
{
    subject.OnNext(i);
}
Console.WriteLine(sw.Elapsed);

Results

1.0.2698.104 – 2.38 seconds

1.0.2856.104 (FastSubject) – 0.12 seconds

1.0 Stable – 0.04 seconds

image

The performance increase is due to two factors. As Wes & Bart explained in their latest video, they have removed the responsibility of scheduling from the subject implementations. This accounts for the improvement between 2698 & the 2856 FastSubject. The next improvement is due to the reduction in memory allocation. Subject’s used to make a copy of the observers array, before exiting the lock and publishing the notification to all the observers. This has been replaced with an immutable list that can be safely access outside of the lock. This immutable list is replaced wholesale when observers subscribe/unsubscribe. Throughput is more important than the subscribe & unsubscribe operations. This make sense if you think about it, its really just how multicast delegate work. Hmmm, more improvements to come?

 

Schedulers

Also discussed in the video, are the changes to the IScheduler interface. I’m testing the scenario where we want to adapt and enumerable into an observable using the NewThreadScheduler.

var sw = Stopwatch.StartNew();
Enumerable.Range(1, 10000)
    .ToObservable(Scheduler.NewThread)
    .Count()
    .Single();
Console.WriteLine(sw.Elapsed);

Results

1.0.2856.104 – 3.43 seconds

v1.0 Stable – 0.14 seconds

image

The reason for this gain is pretty simple. The old scheduling API didn’t provide the action that was being executed any contextual information about the thread is was running on. This meant that for NewThreadScheduler to work properly, ToObservable would have had to been hard wired to create a dedicated event loop when the caller subscribes. But that would fly against the whole idea of parameterising concurrency. The IScheduler interface has been overhauled to accommodate scenarios like this.

Was

public interface IScheduler
{
    IDisposable Schedule(Action action);
    IDisposable Schedule(Action action, TimeSpan dueTime);
    DateTimeOffset Now { get; }
}

Now

public interface IScheduler
{
    IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action);
    IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action);
    IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action);
    DateTimeOffset Now { get; }
}

Don’t panic, you probably won’t ever need to implement this guy, and extension methods allow you to keep using the old API (which I really like!), for example;

Scheduler.NewThread.Scheduler(() => DoStuff());

 

ObserveOn

This wasn’t discussed in the video but I thought it was worth mentioning. ObserverOn has been redesigned so that it makes use of the SchedulerObserver<T>. I always found it strange that this serialization code was duplicated. They’ve addressed that. Shame SchedulerObserver<T> has not been made public though. I think it would be a useful building block for people building their own operators. Anyway, the test code;

var sw = Stopwatch.StartNew();
Observable.Range(1,10000)
    .ObserveOn(Scheduler.ThreadPool)
    .Count()
    .Single();
Console.WriteLine(sw.Elapsed);

Results

1.0.2856.104 – 0.8 seconds

v1.0 Stable – 0.2 seconds

image

I can’t help but feel that there is still more we can squeeze out of ObserveOn. I’ve been playing around with some alternate APIs for this. They will probably find their way into Rxx shortly. More on this soon.

 

Conclusion

The last few builds have yielded some great improvements, an I’m sure there are more to come. If you run into any performance issues I’d encourage you to hit to forums. More soon.

Follow

Get every new post delivered to your Inbox.