Reactive softaware

Software developers are interesting people. Although everyone strives to become as good in his job as possible, this seems to be especially true for software developers. At softaware our programming skills are not only improved by the projects we make, but we also have internal training from time to time. Everyone working here can and is encouraged to become a trainer, you only need to be enthusiastic about something that you want to share with your colleagues.

The thing that I’m enthusiastic about is Reactive Programming. I first heard about this about five years ago, but at that time I didn’t understand how to use it properly. It just seemed to be a paradigm that might be useful in some rare situations but not applicable in general. Two years later I joined a team that has been working on a WPF application for some time. Reactive Programming was a core part of this project, so I was forced to learn and understand it and I finally saw that it was not just some weird paradigm but really helped solving problems - and it helped solving them in a very nice way.

I couldn’t resist, so I took the chance to introduce my colleagues to Reactive Programming. A big thank you goes to Roman, because it’s not self-evident to occupy the whole company for something that you don’t immediately gain anything for. But enough non-tech words, let’s dive into the exciting world of Reactive Programming.

What is Reactive Programming

The term reactive sounds a lot like it’s got to do with handling events, right? That’s true, but Reactive Programming is much more. It’s about representing any changes that can happen within a system. And basically every system is constantly changing, is it a text box value, some database entries or just a simple object property.

There are many nice introductions to Reactive Programming, so I just give you a short overview what Reactive Programming looks like as a .NET developer.

Reactive Programming in .NET

There are two interfaces in the .NET BCL that are heavily used for Reactive Programming: System.IObservable<T> and System.IObserver<T>.

public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

public interface IObserver<in T>
{
    void OnNext(T value);
    void OnError(Exception error);
    void OnCompleted();
}

IObservable<T> represents data that can change over time (e.g. sensor values, mouse location) or events that occured (e.g. button click, timer elapsed).
IObserver<T> is the sink for this data (OnNext is like an event handler).

An observer can subscribe to and unsubscribe from (using the returned IDisposable) an observable. While subscribed the observable notifies the observer about new data (OnNext) and about the termination of the observable sequence (both successful (OnCompleted) or due to an error (OnError)).

What’s the difference to .NET events?

We previously noticed that IObserver<T>.OnNext is like an event handler, so why can’t we do that with plain old .NET events? Because .NET events are missing a type for the stream of events, which means we can’t pass e.g. INotifyPropertyChanged.PropertyChanged around to filter its data or combine it with other events before adding a handler to it.

Similarities between IObservable<T> and IEnumerable<T>

Being a .NET developer you most likely know IEnumerable<T> which represents a sequence of data. It declares a single method IEnumerable<T>.GetEnumerator, but still instances of it can be filtered, combined and more. It’s the same with IObservable<T>. Although it only declares a single method IObservable<T>.Subscribe it enables sophisticated operations on it. The key here is the special implementations of these interfaces. They wrap zero or more inner instances and use the data of these inner instances to produce their own data. E.g. a very rudimentary implementation of a filter operator might look like this:

/// <summary>
/// Filter data from an underlying observable sequence using a filter function.
/// </summary>
internal class FilterObservable<T> : IObservable<T>
{
    private readonly IObservable<T> _inner;
    private readonly Func<T, bool> _filter;

    public FilterObservable(IObservable<T> inner, Func<T, bool> filter)
    {
        _inner = inner;
        _filter = filter;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        // For simplicity we assume that we have an overload of
        // `IObservable<T>.Subscribe` that can construct an observer
        // using the three required functions.
        return _inner
            .Subscribe(
                onNext: data =>
                {
                    if (_filter(data))
                    {
                        observer.OnNext(data);
                    }
                },
                onError: observer.OnError,
                onCompleted: observer.OnCompleted);
    }
}

public static class ObservableExtensions
{
    /// <summary>
    /// Extension method to enable operator chains.
    /// Example usage: <code>observable.Filter(i => i > 10);</code>.
    /// </summary>
    public static IObservable<T> Filter<T>(this IObservable<T> o, Func<T, bool> filter)
    {
        return new FilterObservable<T>(o, filter);
    }
}

The .NET BCL only defines System.IObservable<T> but doesn’t provide any implementations of it. It’s up to library authors to implement some useful operators. In .NET there is only really Rx.NET that defines a set of operators. Still, in a real application you will most likely implement custom operators. Rx.NET helps you a lot here. For a filter operator (which is built into Rx.NET anyway) this is trivial:

public static class ObservableExtensions
{
    public static IObservable<T> Filter<T>(this IObservable<T> o, Func<T, bool> filter)
    {
        return Observable.Create<T>(observer =>
        {
            // This is the same as `FilterObservable<T>.Subscribe` which we saw earlier
            return o
                .Subscribe(
                    onNext: data =>
                    {
                        if (filter(data))
                        {
                            observer.OnNext(data);
                        }
                    },
                    onError: observer.OnError,
                    onCompleted: observer.OnCompleted);
        });
    }
}

A hopefully compelling example

After the purely theoretical introduction we needed a pratical example that we could discuss about. I prepared a simple WPF application that implements the MVVM pattern and uses Rx.NET inside the view models. The sample application can be downloaded from GitHub and looks like this:

When Reactive Programming is applied correctly, most of your view model logic - i.e. how properties are related to each other - will be in the constructor of the view model. So instead of imperatively writing how property B is updated inside the setter of property A, with Reactive Programming you describe how a change of B leads to a new value of A. That may sound like a trivial difference, but it gets quite interesting when more properties are involved. It also helps keeping the property setters clean.

The example application contains a MainViewModel. Inside the constructor the first statement describes how a change to MainViewModel.SearchText leads to a new value for MainViewModel.Persons. Although there are asynchronous computations going on we never have an async void method. When you calculated the value inside a property setter there would either be an async void method or another dirty work-around.

The second statement in the constructor shows, how persons are related to the average age, but not really anything new here. The only really interesting part is how you get an observable of Person.Age and how you use a sequence of those to calculate the current average age.

The third statement shows how an ICommand can also be used as a stream of data. The first thing you’ll notice is that we pass in an observable for its CanExecute logic. So we don’t need to rely on the CommandManager and we also don’t need to find all places where the CanExecuteChanged event should be raised. The execute method asynchronously calculates a value. The command implements IObservable<T>, so you can use this value and calculate another property from it. Because the command is an IObservable<T> you can also use all the other operators on it.

Testing reactive code

Because Reactive Programming embraces asynchronous code one might think that reactive code is hard to test. That might be true for some cases, but carefully crafted reactive statements are quite easy to test. This is because all asynchronous code in Rx.NET goes through implementations of System.Reactive.Concurrency.IScheduler. Every scheduler has its own notion of time. For testing there’s a special scheduler (Microsoft.Reactive.Testing.TestScheduler) where the schedulers time can be controlled from outside. Let’s look at an example test that uses the Throttle operator.

[Fact]
public void ThrottleShouldWork()
{
    var scheduler = new TestScheduler();

    var now = DateTime.UtcNow;

    var observable = scheduler
        .CreateHotObservable(
            OnNext(now.Add(0, 0, 1).Ticks, "s"),
            OnNext(now.Add(0, 0, 2).Ticks, "se"),
            OnNext(now.Add(0, 0, 3).Ticks, "sea"),
            OnNext(now.Add(0, 0, 4).Ticks + 1, "sear"),
            OnNext(now.Add(0, 0, 5).Ticks, "searc"),
            OnNext(now.Add(0, 0, 6).Ticks, "search"))
        .Throttle(TimeSpan.FromSeconds(1), scheduler);

    var observer = scheduler
        .Start(
            () => observable,
            created: 0,
            subscribed: now.Ticks,
            disposed: now.Add(0, 0, 10).Ticks);

    observer
        .Messages
        .Should()
        .Equal(
            OnNext(now.Add(0, 0, 4).Ticks, "sea"),
            OnNext(now.Add(0, 0, 7).Ticks, "search"));
}

We use TestScheduler to create an observable that generates items at a specific time, e.g. “se” is generated in two seconds from now. Then we throttle this sequence so that the item is only passed through when there was no new item for at least one second (note that we need to pass in the scheduler to Throttle). Then we run the observable, i.e. we create the observable at time 0, we subscribe to it now and we dispose the subscription when the schedulers clock says it’s ten seconds from now. scheduler.Start returns a test observer that we can ask which messages it received and check this against our expectation.

If you run the test you will find that it only takes some milliseconds to complete. This is because our scheduler uses a different notion of time than we have in reality.

Error handling

The contract observables fulfill is that they end either successfully or due to an error. One reason an observable ends due to an error is that an unhandled exception is thrown within an operator, e.g. source.Where(p => throw new Exception("boom")). In this case IObserver.OnError is called and the subscription to the observable is disposed. In general that’s not want you want. It’s fine to have the error handled in IObserver.OnError, but oftentimes you’ll want the subscription to stay alive. Unfortunately the contract is implemented in every operator of Rx.NET, so you can’t change that behavior without reimplementing all operators. Instead what you have to do is carefully catch all possible exceptions and individually decide what to do with it. Some options are easy to implement because there are already built-in operators (e.g. swallowing, retrying), others are harder to implement (e.g. have the subscriber handle them) because there’s no built-in support.

tl;dr

I had the pleasure to introduce my colleagues at softaware to Reactive Programming in general and to Rx.NET in particular. It was a very nice experience because this topic raised a lot of interesting questions, concerns and uncertainties. I hope I could clarify most of them and got some colleagues interested in learning more about Reactive Programming and maybe using it in one of their future projects. Because Confucius already taught us: “I hear and I forget. I see and I remember. I do and I understand.”