Rx.NET in the real world

William Barbosa
5 min readSep 20, 2019

Lessons that can help you when marbles can’t

📝 Naming is hard

Because of the decisions the C# team made while designing LINQ, Rx.Net is a one of a kind flavor of Reactive Extensions. The names of all basic operators are made so that they match their LINQ counterparts instead of the RX conventions. This can lead to some confusion, especially if your team learned RX using some other languages.

For completeness sake, here are the C# method names and their “normal” RX names. If you wanna use the “normal” names, I’ve created a gist that adds those methods as extensions:

Then there are methods that sound like they do the same thing but in reality behave slightly differently, leading to even further runtime confusion.

The cherry on top is the SubscribeOn and ObserveOn methods, which are so confusing I sometimes catch myself looking at their documentation.

😫 Threading is harder

Reactive Extensions are, by nature, thread agnostic. The thread in which the work happens is not guaranteed by default. This is what the IScheduler interface is for. Out of the box, we get a couple of useful and very well named schedulers, like CurrentThreadScheduler, NewThreadScheduler, SynchronizationContextScheduler and ThreadPoolSchedulerto name a few.

As useful as these are, they don’t help us with a problem common to interface developers: all work that updates the UI is meant to run on a special thread, the Main Thread.

My team tried leveraging both the CurrentThreadScheduler and the SynchronizationContextScheduler to behave as though they were a MainThreadScheduler. We did so by ensuring that Observables were only ever created from the constructors of ViewModels, which in turn were always created in the main thread.

This approach had two flaws. The first is that sometimes we needed to create our observables in different threads. In such scenarios, we found ourselves doing all sorts of jerry-rigs to find a way around the limitation of single thread observable creation. The other problem was testing. We were constrained to the current thread schedulers, but we needed a TestScheduler for testing.

We finally found a solution to this problem by drawing inspiration from RxSwift’s Driver. A driver is like an observable, but it can’t error, always runs on the main thread and always emits the last event to subscribers on subscription. This makes Drivers perfect for all your UI needs!

The final solution was adding a .AsDriver(SchedulerProvider) method. This method simply takes an instance of the ISchedulerProvider interface, retrieves its MainThreadScheduler and then uses operators to ensure the observable never fails and is shared. We used the main thread schedulers from the ReactiveUI project and everything worked like a charm 👌

🔮 Observables are just monoids in the category of endofunctors, so what?

Monads are historically known for being something hard to explain, but the point here is not making you understand them. While I totally suggest you read more on this subject, my focus here is on something else, so please read on even if you don’t fully get what monads are.

C# has some common monads we use daily. Out of those I’d like to highlight two: IEnumerable<T> and Task<T>. These have powerful language features attached to them: the LINQ query syntax and async/await, respectively.

What’s lesser known in C# land is that these language features can be applied to any type that implements some specific methods, and the Rx.NET package implements these methods for the IObservable<T> interface!

The await operator is used to extract a value of type T from the Task<T> monad. You can literally do the same with Observable!

public Task<int> getTask()
=> Task.FromResult(1);

public IObservable<int> getObservable()
=> Observable.Return(1);
T valueFromTask = await getTask();
T valueFromObservable = await getObservable();
valueFromTask.Should().Be(valueFromObservable);

In order for observables to become awaitable, all you need to do is add the using System.Reactive.Linq; using statement. Simple as that! Bear in mind that when you await an Observable you are awaiting for its last value (that is, the last value emitted before the observable completes), so calling an observable that hasn’t completed yet will make the call hang until the observable completes. See the following example

var a = Observable.Return(1);
var b = new BehaviorSubject<int>(1);

var value = await a; //This executes normally
var otherValue = await b; //This hangs forever
value.Should().Be(otherValue); //This never gets executed

To prevent such hanging from happening, call obs.FirstAsync() , which will transform the observable in an observable that returns its latest emitted value and then completes.

The other thing we can use to consume Observable<T> in C# is the LINQ query syntax. Any type that implements Select , Where and a couple of other specially named methods can benefit from this syntax and IObservable is one of those types, where we can flatten observable:

var numberSource1 = Observable.Return(2);
var numberSource2 = Observable.Return(Observable.Return(2)); // Nested

var unwrapAndSumFluent =
numberSource2
.SelectMany(x => x)
.CombineLatest(numberSource1, (x, y) => x + y)
.Select(x => x.ToString());

var unwrapAndSumQuery =
from obs in numberSource2
from number in obs
from otherNumber in numberSource1
let sum = number + otherNumber
select sum.ToString();

var query = await unwrapAndSumQuery;
var fluent = await unwrapAndSumFluent;

fluent.Should().Be(query);

And even apply filtering:

var numberSource1 = Observable.Return(2);
var numberSource2 = Observable.Return(2);

var combiningWithFluentSyntax =
Observable
.CombineLatest(numberSource1, numberSource2, (number, otherNumber) => number + otherNumber)
.Where(sum => sum % 2 == 0)
.Select(sum => sum.ToString());

var combiningWithQuerySyntax =
from number in numberSource1
from otherNumber in numberSource2
let sum = number + otherNumber
where sum % 2 == 0
select sum.ToString();

var fluent = await combiningWithFluentSyntax;
var query = await combiningWithQuerySyntax;

fluent.Should().Be(query);

They are yield the same value! Note that from a in b is by no means equivalent to CombineLatest , it’s just that in this example they happen to be the same because both source observables emit a single value. This property makes the query syntax specially suitable for combining observables and tasks (assuming that you implement SelectMany and Where for the Task<T> type as well).

The LINQ query syntax is by no means superior (or inferior) to its fluent counterpart, but knowing both can help you make your code more readable in some cases, so it’s an useful tool on your reactive belt.

--

--