Structs that implement IDisposable

A few weeks ago some friends and I were at the pub discussing value types that implement IDisposable. Consider the IObservable interface.

interface IObservable<T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

One *might* consider using a disposable value type to take some pressure off the GC.

struct SlimDisposable : IDisposable
{
  // ....    
}

Unfortunately this line of thinking is flawed. The value type will be boxed as an IDisposable, resulting in the heap allocation we were trying to avoid. Lets pretend our interface looked more like this;

interface IObservable<T>
{
    SlimDisposable Subscribe(IObserver<T> observer);
}

Or maybe;

interface IObservable<TValue,TDisposable> where TDisposable : IDisposable
{
    TDisposable Subscribe(IObserver<TValue> observer);
}

Now we could reference SlimDisposable without it being boxed! But is this a good thing? Unfortunately creating disposable value types is fraught with danger!

Eric Lippert has a great post on the subject here; To box or not to box, that is the question

To quote MSDN: “To help ensure that resources are always cleaned up appropriately, a Dispose method should be callable multiple times without throwing an exception.”

This means that a disposable object usually needs to mutate & track some state to determine if it has been disposed or not. But we are using a value type this state will be copied whenever we do an assignment. Consider the following;

public struct Disposable : IDisposable
{
    public bool IsDisposed;

    public void Dispose()
    {
        if(!IsDisposed)
        {
            Console.WriteLine("Disposing Resources");
            IsDisposed = true;
        }
    }
}

Now lets say you did something like this;

var d1 = new Disposable(); 
var d2 = d1; 
d1.Dispose(); 
Console.WriteLine("d1 == {0}, d2 == {1}", d1.IsDisposed, d2.IsDisposed);

OUTPUT

Disposing Resources

d1 == True, d2 == False

We now have two states & only 1 underlying resource!

Purely as an educational exercise, we decided to write a query that would find all the value types in the .NET framework that implement IDisposable. It turns out there are in fact quite a few

See for yourself;

from f in Directory.GetFiles (
       Path.Combine (
             Environment.GetFolderPath (System.Environment.SpecialFolder.ProgramFilesX86), 
             @"Reference Assemblies\Microsoft\Framework\.NETFramework\v4.0"),
       "*.dll")
where !f.ToLowerInvariant().Contains ("thunk")
where !f.ToLowerInvariant().Contains ("wrapper")
select Assembly.LoadFrom (f) into a
from t in a.GetExportedTypes()
where t.IsValueType && typeof(IDisposable).IsAssignableFrom (t)
select new { a.GetName().Name, t.FullName }

Capture

We’ve got some interesting types here;

System.Thread.AsyncFlowControl

System.Thread.CancellationTokenRegistration

System.Windows.Threading.DispatchProcessingDisabled

Both CancallationTokenRegistration & DispatchProcessingDisabled get around the mutable state problem; they are in fact immutable value types. They use the state of a parent object (a reference type) to determine if they have been disposed or not. This means they can be assigned/copied safely.

AsyncFlowControl is in fact a mutable value type! It does however use some state on the tread that is references to determine if the control flow is suppressed or not. However it does mean you can do weird things like this;

var afc1 = ExecutionContext.SuppressFlow();
var afc2 = afc1;
afc1.Dispose();
var afc3 = ExecutionContext.SuppressFlow();
afc2.Dispose();

This will result in the flow being restored, even though we never disposed of afc3. The BCL developers may have decided that in this case, the performance and pressure on the GC was so critical, that they’d commit this sin. Transferring ExecutionContext information from one thread to another is probably considered somewhat of a performance hotspot. I suspect it is also the case that very few developers have ever used this API let alone know what an ExecutionContext is!

Anyway, the real point of interest here is, “oh my god, what are all these enumerators!”

Next time – Structs that implement IEnumerator.

More Reactive Extensions Performance Improvements

I blogged about performance when Rx was officially released a few months ago.

Last week the team released a new version (1.1.11111) where “The major focus of this release is performance-related work”.

Subject<T> now uses non-blocking synchronization!

Internally a subject has a list of subscribed observers. Traditionally, concurrent access to the list has been synchronised internally with a C# lock (Monitor). The latest release uses compare & swap (CAS) operation to modify it’s internal state. As OnNext doesn’t modify the internal state it doesn’t even need a CAS operation, it simply dispatches the notifications to all the observers.

Test code (from Rx team)

for (int n = 0; n < 10; n++)
{
    var c = new CountdownEvent(n);

    var s = new Subject<int>();
    for (int i = 0; i < n; i++)
        s.Subscribe(_ => { }, () => c.Signal());

    var sw = Stopwatch.StartNew();

    Scheduler.ThreadPool.Schedule(() =>
    {
        for (int i = 0; i < 100000000; i++)
            s.OnNext(42);
        s.OnCompleted();
    });

    c.Wait();
    sw.Stop();
    Console.WriteLine(sw.Elapsed);
}

Results

image

Reactive Extensions Extensions 1.2

 
What’s New?
  • Internalized or removed all interactive extensions that were unnecessary or that shadowed Microsoft’s Ix Experimental library.
  • Added DependencyObjectExtensions.DependencyPropertyChanged and UIElementExtensions.RoutedEventRaised extension methods and a corresponding lab.
  • Added CollectionNotification, CollectionModification, IListSubject, IDictionarySubject and concrete types.
  • Added Collect extension methods to Observable2, ObservableDirectory, DirectoryInfoExtensions and FileSystemWatcherExtensions.
  • Added ObservableDirectory lab.
  • Moved various extension methods into new classes: SmtpClientExtensions, PingExtensions, HttpListenerExtensions, WebClientExtensions, WebRequestExtensions and SocketExtensions.
  • Adjusted trace identity format. Also adjusted the default text for Ix tracing.
  • Added ObservableSyndication for RSS 2.0 and Atom 1.0, with a corresponding UI lab that also uses DictionarySubject and the Collect extension method.
  • Added ApplicationSettingsBase extensions for observing setting changes.
  • Added ICommand extensions, CommandSubject, AnonymousCommand and a corresponding lab.
  • Added the Subscription XAML markup extension for WPF, which is similar to Binding and supports observable data sources. Includes a corresponding lab.
  • Added EventSubscription trigger, which supports event handler bindings from FrameworkElement to IObserver, delegate or ICommand properties. Includes a corresponding lab.
  • Major performance and memory improvements for parsers; now avoids stack overflows due to recursion in quantifiers.
  • Added ICursor<T> and IObservableCursor<T> types with concrete implementations, including CursorSubject<T> and ToCursor extension methods.
  • Added full support for reactive XML parsers in WP7. Now there’s complete parser parity across all platforms.
  • Added view model support for all platforms, with corresponding UI labs. Includes optional IViewModel interface and optional Rxx.ViewModel base class.
  • Added Exactly parser operator.
  • Added non-greedy variants to some of the quantifying parser operators.
  • Added an overload to the AtLeast parser operator that accepts a maximum parameter, with behavior similar to the {n,m} regex pattern.
  • Added AndUnordered and AllUnordered parser operators, fixed the XML parsers so that attributes are matched in any order and reversed the order of the attributes in the XML schema labs.
  • Added Consume extensions that generalize the producer/consumer pattern over observables.
  • Added Stream, FileStream and TextReader extensions.
  • Added ObservableFile class with a corresponding lab.
  • Added n-ary Zip and CombineLatest combinators.
  • Improved lab application.

Full Release Notes Here!

Download here

NuGet Packages are also available

Observable.Generate Pop Quiz

What would be the output of the following program;

var xs = new int[]{1,2,3};
Observable.Generate(
    xs.GetEnumerator(),    // initial state
    e => e.MoveNext(),    // break condition
    e => e,            // iterate
    e => e.Current        // result selector
).Subscribe(Console.WriteLine);

The program (correctly) outputs;

1

2

3

What about this program? Would the output be the same? If not why not?

var xs = new List<int>{1,2,3};

Observable.Generate( xs.GetEnumerator(), // initial state e => e.MoveNext(), // break condition e => e, // iterate e => e.Current // result selector ).Subscribe(Console.WriteLine);

Stay tuned.

Asus U36SD notebook running Windows 8 developer preview

I’ve just setup a new laptop running Windows 8 & Visual Studio 11 developer preview.

Hardware

Asus U36SD
Intel Core i7
4GB RAM
160GB SSD
1GB NVidia 520M

http://www.asus.com/Notebooks/Superior_Mobility/U36SD/

Installation

It was really easy to get Window 8 up and running. The laptop came with Windows 7 pre installed.

  1. Boot into Windows 7
  2. Download 64 bit developer preview (with developer tools)
  3. Unpack the ISO onto the desktop (I used 7zip)
  4. Run “Setup.exe”
  5. It will ask if you want to check internet for latest version – “Yes”
  6. It will ask what you want to keep – “Nothing”!

In what seemed like less than 5 minutes (I didn’t time it), the machine had rebooted and I was running Windows 8!

Visual Studio 11

It comes pre-installed, so there is nothing to do there.

I’ve create a simple hello world application. I chose to build a “C# metro application”, you are presented with the familiar XAML designer & C# code behind. There is not really much to say here. Visual Studio 11 looks and feels exactly the same as 2010 except there are a bunch of new projects for “Metro applications”. WPF & Silverlight developers won’t have any difficultly slotting into this development environment. Apart from the namespaces everything is very similar.

I’ll post more once I’ve something interesting to talk about…

Financial Charts & Reactive Extensions

 

Introduction

The price of a financial instrument is not a fixed value. It is something that changes over time, driven by market forces. It’s quite common to view this as a chart that is updated as the price changes. In this post I will demonstrate how to sample an observable stream of prices and display the results in a real-time charting application.

To start with, lets consider a basic chart (aka tick chart), where we simply plot each movement in the underlying instrument.

image

This is fine when looking at small samples of data, but for volatile instruments, we might receive many prices per second. This can quickly become unwieldy. The computer might not have enough memory to plot days, months or years of prices. This will also show too much noise to the user, with constant price fluctuations that do not represent the sentiment of the market.

We need to sample the data in some way. The simplest approach is just to take the latest price at regular intervals. Rx has an operator that behaves in the exact manner – Sample.

image

This solution isn’t really suitable for technical analysis as we are diluting the information to an extent that it is difficult to deduce the overall market trend. Unfortunately analysts and traders are quite fussy! What they need is a chart that represents price performance for a specific period in a single point on the x-axis.

 

OHLC Charts

I’ve chosen this particular type of chart as it relatively simple. All we need to do is partition our data into regular time periods and determine four values;

image

We can then plot these quadruples as a price bar or candlestick. A “price bar” is a line representing the highest & lowest prices over a period of time, with “tick marks” showing the opening and closing price.

image

The candlesticks I mentioned are very similar, however the body is shaded to signify a positive/negative movement.

image

Lets see what our original chart would like like using price bars.

image

And now we can remove the original points…

image

I hope it’s clear how that was done. Additionally, it’s quite common to colour the bars so that a negative movements, between the open and close, are in red and positive movements are in a happy colour like blue or green. As I mentioned earlier this is essential for the candlestick variant of the chart. Ultimately we want to be able to build applications that plots real-time data like this;

image

image

Now that you know what we are building, lets work out how the Rx code is going to work. Hold on to your hat!

 

Generate

First up, we’re going to need a stream of observable prices to play with. I’d like my price stream to start at $5. Then every 10th of a second, I’d like to apply a random adjustment to the price.

var rand = new Random();

var prices = Observable.Generate(
    5d,
    i => i > 0, 
    i => i + rand.NextDouble() - 0.5,
    i => i,
    i => TimeSpan.FromSeconds(0.1)
);

Woah! Lets go through this slowly.

  • The first parameter (5d) is the initial seed or starting price for our observable sequence.
  • Then we have a break condition (i > 0). We are effectively saying, keep generating prices while the price is greater than zero. If the price drops to zero we can assume that the company is bankrupt.
  • Then we define (i + rand.NextDouble() – 0.5) how we’d like the price to be incremented, by a random value between –0.5 & 0.5.
  • You can then optionally transform the notification, we are not using this feature (i => i).
  • Finally we can supply a time interval TimeSpan.FromSeconds(0.1), telling Generate when we’d like to yield the next iteration in our observable sequence. We could randomize this as well, I’m just going to generate a new price every 0.1 seconds.

Essentially Generate is a parameterised factory method that allows you to “corecursively” define an observable sequence. The function parameters passed into the method define the behaviour (the single next step) of the observable sequence, much like a mathematical series. In functional programming this concept is known as “anamorphism” or “unfold”. Generate is anamorphism for observables sequences!

If you drop the code in LINQPad, you should see an output like this.

image

If you don’t have LINQPad, you could write a small console application to test this out.

 

Buffer

So now that we have some test data to work with lets look at how we can calculate the Open, High, Low & Close (OHLC) prices. Last August I did this using Rx’s Buffer operator. Reactive Extensions has evolved significantly since then. Here is a quote from my old post.

“Rx handles this type of problem perfectly via the “Buffer” operator.”

We had some code similar to this;

from buffer in prices.Buffer(TimeSpan.FromSeconds(1))
select new
{
    Open = buffer.First(),
    High = buffer.Max(),
    Low = buffer.Min(),
    Close = buffer.Last()
}

You can drop this in LINQPad with our test data.

image

This works, however in retrospect, “perfectly” was a little far from the truth. Lets take a close look at how buffer works…

As we subscribe to the query, the buffer operator will create a list. All notifications from the underlying source will be placed into this list until the timer elapses. At this point, the observer will be given the list containing all the notifications. We can then interact with this list, in this example, to calculate our OHLC values. This might help you understand how the buffer operator works. I’d recommend pasting this code into LINQPad and having a play around with it.

Code

var source = new Subject<char>();
var timer = Observable.Interval(TimeSpan.FromSeconds(1));
var buffer = source.Buffer(() => timer);
buffer.Subscribe(Console.WriteLine);

source.OnNext('a');
source.OnNext('b');
source.OnNext('c');

Thread.Sleep(1100);

source.OnNext('d');
source.OnNext('e');
source.OnNext('f');
source.OnCompleted();

Pictures

Marble Diagram: Buffer

This approach is fine when we have three notifications per 1 second interval, but what will happen if we have a volatile price and say maybe a 1 minute interval? Our buffer is completely unbounded. This could lead to our short lived notifications being elevated into the generation 2 heap. If the buffer gets big enough, this could even result in a large object heap allocation.

Buffer has its uses but in this case, our open, high, low & close computation, doesn’t need all the values at once. We could perform the same calculation by stashing the first and last values (open & close) and tracking the min & max (high and low) values over the specified interval. What we need is an operator that allows us to react to the values over a window of time.

 

Window

Last December the Rx Team gave us a Christmas present focused around “Programming Streams of Coincidence”. There are some pretty powerful tools in there. If you are interested Lee Campbell has a post covering this family of operators. For this problem, we are interested in Window, which really is perfect for calculating OHLC over an observable stream of prices. Buffer & Window have a few different overloads, but these are the two we are talking about here;

IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan)
IObservable<IObservable<T>> Window<T>(this IObservable<T> source, TimeSpan timeSpan)

So how do these two operators differ? Buffer creates a list at each window opening and passes it to the observer when the window closes. In contrast the Window operator creates a subject at each window opening and passes it to the observer immediately. The subject acts as a conduit, allowing the operator to pipe each notification to the observer until the window closes. Here is the Buffer marble diagram again, this time along side Window;

Marble Diagram: Buffer VS. Window

Window is more powerful than Buffer as the observer can decide how to process the notifications within the context of the window. This can be achieved by applying SelectMany to the Window operator.

Query Comprehension Syntax

from window in source.Window(timeSpan)
from ? in window.?
select ?

Lambda Syntax

source.Window(timeSpan).SelectMany(window => ?, (x, ?) => ?)

Pictures

Marble Diagram: Window and SelectMany

Interestingly the Buffer overload we’ve been discussing, is actually implemented using this technique. Lets use this as an example;

Query Comprehension Syntax

// Implement Buffer using Window
public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan)
{
    // Transform IObservable<IObservable<T>> -> IObservable<IList<T>>
    return 
        from window in source.Window(timeSpan)
        from buffer in window.ToList()
        select buffer;
}

Lambda Syntax

source.Window(timeSpan).SelectMany(window => window.ToList())

Pictures

Marble Diagram: Buffer Implemented Using Window

 

You can write any query you want over the window observable. Maybe the observer is only interested in the last value, this is semantically equivalent to Sample;

Query Comprehension Syntax

// Implement Sample using Window
public static IObservable<T> MySample<T>(this IObservable<T> source, TimeSpan timeSpan)
{
    return
        from window in source.Window(timeSpan)
        from last in window.TakeLast(1)
        select last;
}

Lambda Syntax

source.Window(timeSpan).SelectMany(window => window.TakeLast(1))

Pictures

Marble Diagram: Sample Implemented Using Window

 

Aggregate

OK so we have a mechanism that effectively divides a stream of events into smaller streams of events based on windows of time. All we need is a query that we can run over each observable window. Remember Buffer was implemented using ToList? Well ToList is actually implemented using Aggregate!

In fact all sorts of things can be implemented using Aggregate. Remember the Generate (anamorphism) method we used to create our test data? Well it’s actually the dual to Aggregate! While Generate takes a seed and applies some functions to produce a series of notifications, Aggregate takes a series of notifications and recursively applies an accumulator function, until it reaches the end of the sequence, at which point it yields a result. Aggregate is “catamorphism or fold” for observable sequences!

Mathmatical Duals: Aggregate, Generate, Catamorphism, Anamorphism, Fold, Unfold

 

We are going to use Aggregate to compute the OHLC values for each Window of prices. Before we do that lets just make sure everyone is comfortable.

Aggregate Method Signature

IObservable<TAccumulate> Aggregate<TSource, TAccumulate>(
    this IObservable<TSource> source,
    TAccumulate seed,
    Func<TAccumulate, TSource, TAccumulate> accumulator)

Marble Diagram

Marble Diagram: Aggregate

Confused? Scared? Don’t be…

Lets look at an example. Here is Sum implemented using Aggregate.

Sum

Observable.Range(1,3).Aggregate(0, (sum, value) => sum + value)

image

As I mentioned earlier, ToList is also implemented using Aggregate.

ToList

Observable.Range(1,3).Aggregate(new List<int>(), (list, value) =>
{
    list.Add(value);
    return list;
})

Marble Diagram: ToList

If you’ve not grokked it, try implementing some other basic operators like Min & Max.

 

OHLC (Window + SelectMany + Aggregate)

So can we use Aggregate for our OHLC calculations? Lets start with the easy stuff by defining the fields we’ll need to keep track of these four values.

class OHLC
{
    public double? Open;
    public double? High;
    public double? Low;
    public double Close;
}

Now we just need a function that takes the current values & a prices and produces the new values.

// (TAccumulate, TSource) -> TAccumulate
static OHLC Accumulate(OHLC state, double price)
{
    // Take the current values & apply the price update.    
    state.Open = state.Open ?? price;
    state.High = state.High.HasValue ? state.High > price ? state.High : price : price;
    state.Low = state.Low.HasValue ? state.Low < price ? state.Low : price : price;
    state.Close = price;
    return state;
}

If we bring this together with Window, SelectMany & Aggregate, we’ve now got a query that takes a stream of prices, splits it into windows and calculates OHLC values.

from window in prices.Window(TimeSpan.FromSeconds(1))
from result in window.Aggregate(new OHLC(), Accumulate)
select result

image

 

For readers who are not using LINQPad, I’ve written a console application. You can download it here;

Console Application Code

using System;

namespace ConsoleApplication124
{
    using System.Reactive.Linq;

    class Program
    {
        static void Main()
        {
            var rand = new Random();
            var prices = Observable.Generate(
                5d, i => i > 0, i => i + rand.NextDouble() - 0.5, i => i, i => TimeSpan.FromSeconds(0.1));

            var query = from window in prices.Window(TimeSpan.FromSeconds(1))
                        from result in window.Aggregate(new Ohlc(), Accumulate)
                        select result;
            query.Subscribe(Console.WriteLine);
            Console.ReadLine();
        }

        class Ohlc
        {
            public double? Open;
            public double? High;
            public double? Low;
            public double Close;

            public override string ToString()
            {
                return (new { Open, High, Low, Close }).ToString();
            }
        }

        static Ohlc Accumulate(Ohlc current, double price)
        {
            current.Open = current.Open ?? price;
            current.High = current.High.HasValue ? current.High > price ? current.High : price : price;
            current.Low = current.Low.HasValue ? current.Low < price ? current.Low : price : price;
            current.Close = price;
            return current;
        }
    }
}

Plotting The Results

Finally, we’re going to plot these results using the charting controls that come with .NET 4.0. Unfortunately/Strangely these controls are only available for Windows Forms developers. It appears that they will be available in a future version of WPF, there is a preview release available here. Alternatively there are lots of 3rd party charting packages that offer similar functionality. I’ll try these out in a future post. For now I’m going to use the Windows Forms controls, conceptually there shouldn’t be much of a difference.

First lets prepare the project.

1. Create a new Windows Forms project.

image

4. Add references to Reactive Extensions.

image

Your references should now look something like this;

image

3. Drop a Chart control onto the form.

image

4. I’m going to get rid of the Legend.

image

Click remove.

image

 

The Code

In the introduction I talked about two different chart types. Both are supported by this control library.

series.ChartType = SeriesChartType.Candlestick;
series.ChartType = SeriesChartType.Stock;

For testing purposes, I’m going to set the resolution of the time axis so that it will work with 1 second intervals. A window that small isn’t useful in the real world, the smallest I’ve seen is 1 minute, but we want to see some results straight away.

series.XValueType = ChartValueType.Time;
var area = chart1.ChartAreas[0];
area.Axes[0].Title = "Time";
area.AxisX.LabelStyle.IntervalType = DateTimeIntervalType.Seconds;
area.AxisX.LabelStyle.Format = "T";

Finally we need to subscribe to our query & test data and populate the chart.

query.ObserveOn(this).Subscribe(x => series.Points.AddXY(DateTime.Now, x.High, x.Low, x.Open, x.Close));

If you put all this code in the form’s constructor and run the project, you should have something like this;

using System;
using System.Windows.Forms;
using System.Reactive.Linq;
using System.Windows.Forms.DataVisualization.Charting;

namespace Chart
{
    public partial class Form1 : Form
    {

        public Form1()
        {
            InitializeComponent();

            // Configure the chart
            var series = chart1.Series[0];
            series.ChartType = SeriesChartType.Candlestick;
            series.XValueType = ChartValueType.Time;
            var area = chart1.ChartAreas[0];
            area.Axes[0].Title = "Time";
            area.AxisX.LabelStyle.IntervalType = DateTimeIntervalType.Seconds;
            area.AxisX.LabelStyle.Format = "T";

            
            // Test prices
            var rand = new Random();
            var prices = Observable.Generate(5d, i => i > 0, i => i + rand.NextDouble() - 0.5, i => i, i => TimeSpan.FromSeconds(0.1));

            // OHLC query
            var query =
                from window in prices.Window(TimeSpan.FromSeconds(1))
                from ohlc in window.Aggregate(new OHLC(), Accumulate)
                select ohlc;

            // Subscribe & display results
            query.ObserveOn(this).Subscribe(x => series.Points.AddXY(DateTime.Now, x.High, x.Low, x.Open, x.Close));

        }

        class OHLC
        {
            public double? Open;
            public double? High;
            public double? Low;
            public double Close;
        }

        static OHLC Accumulate(OHLC current, double price)
        {
            current.Open = current.Open ?? price;
            current.High = current.High.HasValue ? current.High > price ? current.High : price : price;
            current.Low = current.Low.HasValue ? current.Low < price ? current.Low : price : price;
            current.Close = price;
            return current;
        }
    }
}

Run the application;

image

 

Additionally we can apply a suitable colour scheme to the chart;

// Colours
chart1.BackColor = Color.Black;
chart1.ChartAreas[0].Axes[0].LineColor = Color.LimeGreen;
chart1.ChartAreas[0].Axes[0].TitleForeColor = Color.LimeGreen;
chart1.ChartAreas[0].AxisX.MajorTickMark.LineColor = Color.LimeGreen;
chart1.ChartAreas[0].AxisX.LabelStyle.ForeColor = Color.LimeGreen;
chart1.ChartAreas[0].Axes[1].LineColor = Color.LimeGreen;
chart1.ChartAreas[0].Axes[1].TitleForeColor = Color.LimeGreen;
chart1.ChartAreas[0].AxisY.MajorTickMark.LineColor = Color.LimeGreen;
chart1.ChartAreas[0].AxisY.LabelStyle.ForeColor = Color.LimeGreen;
chart1.ChartAreas[0].BackColor = Color.Black;
series["PriceDownColor"] = "Red";

image

 

Download

You can get a working demo application here. As discussed I will provide a WPF based solution soon.

I’ve tried to make this article useful for both readers that know Rx but nothing about finance, or know finance but are not familiar with Rx. I’m not sure if it really works so I’m interested to hear feedback from both camps.

More soon.

New Rxx Release (It’s a big one!)

We couldn’t let Rx go live without an Rxx refresh and boy is it a big one! Dave Sexton & I are proud to announce Rxx 1.1. Firstly let me congratulate Dave. This is a major milestone for us and he has put in a huge effort, much of which has been enhancing our build process. Rxx is now available in the following configurations!

Build Configurations

  • .NET Stable
  • .NET Experimental
  • Silverlight Stable
  • Silverlight Experimental
  • Windows Phone 7 Stable
  • Windows Phone 7 Experimental

Dave has spent the last month mirroring the Rx team’s build process, including documentation & labs, ported to their respective platforms. I guess we’re going to have to support XNA next! Of course there would be no point in doing all this if we didn’t have some useful features. Don’t worry there are plenty of new ones.

Major New Features

  • Rx Parsers
  • Rx Dns
  • Rx Smtp Clients
  • Rx Sockets
  • Rx Web Requests
  • Rx Network Changes
  • Rx Ping
  • A new type of Multicasting that allows state to be cleared when the observable goes “cold”.

Technologies

Along with out new build process, we’ve incorporated a range of technologies to ensure that Rxx remains a high quality project;

  • Code Contracts
  • StyleCop Analysis
  • FxCop Analysis
  • Sandcastle Documentation
  • MS Test

Additionally Dave Sexton’s Labs abstraction means that our interactive labs run on each platform.

What’s next?

I’m really proud to be part of a project that is shaping up to be one of the most professional looking open source projects I’ve seen. The work that’s gone into our project in this release gives us a great platform to build on going forward. Head over to the project page & as always, we are eagerly awaiting any feedback or ideas you might have.

Links

Rx Performance Improvements

I think being a technology that enables us to easily write multithreaded, concurrent applications, a question often asked is;

“how fast is this stuff?”

As Rx completes its metamorphosis from “labs project” to “official product”, it seems the team has started focusing on performance. This is probably quite normal as the project shifts from designing the perfect API to supporting a stable product.

I thought I’d share some benchmarks on where some of these improvements have been made.

 

Subjects

Subjects are used everywhere in Rx, so its a good candidate for optimisation, giving us across the board performance gains. I’m testing the throughput by publishing 1 million notifications.

// Subject thoughput performance test
var subject = new Subject<int>();
subject.Subscribe(_ => { });
var sw = Stopwatch.StartNew();
for (int i = 0; i < 1000000; i++)
{
    subject.OnNext(i);
}
Console.WriteLine(sw.Elapsed);

Results

1.0.2698.104 – 2.38 seconds

1.0.2856.104 (FastSubject) – 0.12 seconds

1.0 Stable – 0.04 seconds

image

The performance increase is due to two factors. As Wes & Bart explained in their latest video, they have removed the responsibility of scheduling from the subject implementations. This accounts for the improvement between 2698 & the 2856 FastSubject. The next improvement is due to the reduction in memory allocation. Subject’s used to make a copy of the observers array, before exiting the lock and publishing the notification to all the observers. This has been replaced with an immutable list that can be safely access outside of the lock. This immutable list is replaced wholesale when observers subscribe/unsubscribe. Throughput is more important than the subscribe & unsubscribe operations. This make sense if you think about it, its really just how multicast delegate work. Hmmm, more improvements to come?

 

Schedulers

Also discussed in the video, are the changes to the IScheduler interface. I’m testing the scenario where we want to adapt and enumerable into an observable using the NewThreadScheduler.

var sw = Stopwatch.StartNew();
Enumerable.Range(1, 10000)
    .ToObservable(Scheduler.NewThread)
    .Count()
    .Single();
Console.WriteLine(sw.Elapsed);

Results

1.0.2856.104 – 3.43 seconds

v1.0 Stable – 0.14 seconds

image

The reason for this gain is pretty simple. The old scheduling API didn’t provide the action that was being executed any contextual information about the thread is was running on. This meant that for NewThreadScheduler to work properly, ToObservable would have had to been hard wired to create a dedicated event loop when the caller subscribes. But that would fly against the whole idea of parameterising concurrency. The IScheduler interface has been overhauled to accommodate scenarios like this.

Was

public interface IScheduler
{
    IDisposable Schedule(Action action);
    IDisposable Schedule(Action action, TimeSpan dueTime);
    DateTimeOffset Now { get; }
}

Now

public interface IScheduler
{
    IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action);
    IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action);
    IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action);
    DateTimeOffset Now { get; }
}

Don’t panic, you probably won’t ever need to implement this guy, and extension methods allow you to keep using the old API (which I really like!), for example;

Scheduler.NewThread.Scheduler(() => DoStuff());

 

ObserveOn

This wasn’t discussed in the video but I thought it was worth mentioning. ObserverOn has been redesigned so that it makes use of the SchedulerObserver<T>. I always found it strange that this serialization code was duplicated. They’ve addressed that. Shame SchedulerObserver<T> has not been made public though. I think it would be a useful building block for people building their own operators. Anyway, the test code;

var sw = Stopwatch.StartNew();
Observable.Range(1,10000)
    .ObserveOn(Scheduler.ThreadPool)
    .Count()
    .Single();
Console.WriteLine(sw.Elapsed);

Results

1.0.2856.104 – 0.8 seconds

v1.0 Stable – 0.2 seconds

image

I can’t help but feel that there is still more we can squeeze out of ObserveOn. I’ve been playing around with some alternate APIs for this. They will probably find their way into Rxx shortly. More on this soon.

 

Conclusion

The last few builds have yielded some great improvements, an I’m sure there are more to come. If you run into any performance issues I’d encourage you to hit to forums. More soon.

Rx Official Release (more…)

Reactive Extensions: It’s Official & Interactive Extensions Returns!

Yay! Version 1.0 of Reactive Extensions has just been officially released!

http://social.msdn.microsoft.com/Forums/en/rx/thread/57017698-d6c9-4434-bffe-9c49363b3c2f

Additionally an Interactive Extensions (Ix) is now package separately its own experimental release!

Follow

Get every new post delivered to your Inbox.