On ReactiveX – Part IV

In the previous parts, first we tried to write a simple app with pure Rx concepts and were consumed by demons, then we disentangled the Frankenstein Observable into its genuinely cohesive components, then organized the zoo of operators by associating them with their applicable targets. Now it’s time to put it all together and fix the problems with our Rx app.

Remember, our requirements are simple: we have a screen that needs to download some data from a server, display parts of the downloaded data in two labels, and have those labels display “Loading…” until the data is downloaded. Let’s recall the major headaches that arose when we tried to do this with Rx Observables:

  • We started off with no control over when the download request was triggered, causing it to be triggered slightly too late
  • We had trouble sharing the results of one Observable without inadvertently changing significant behavior we didn’t want to change. Notice that both this and the previous issue arose from the trigger for making the request being implicit
  • We struggled to find a stream pipeline that correctly mixed the concepts of “hot” and “cold” in such a way that our labels displayed “Loading…” only when necessary,

With our new belt of more precise tools, let’s do this right.

The first Observable we created was one to represent the download of data from the server. The root Observable was a general web call Observable that we create with an HTTPRequest, and whose type parameter is an HTTPResponse. So, which of the four abstractions really is this? It’s a Task. Representing this as a “stream” makes no sense because there is no stream… no multiple values. One request, one response. It’s just a piece of work that takes time, that we want to execute asynchronously, and may produce a result or fail.

We then transformed the HTTPResponse using parsers to get an object representing the DataStructure our server responds with. This is a transformation on the Task. This is just some work we need to do to get a Task with the result we actually need. So, we apply transformations to the HTTP Task, until we end up with a Task that gives us the DataStructure.

Then, what do we do with it? Well, multiple things. What matters is at some point we have the DataStructure we need from the server. This DataStructure is a value that, at any time, we either have or don’t have, and we’re interested in when it changes. This is an ObservableValue, particularly of a nullable DataStructure. It starts off null, indicating we haven’t retrieved it yet. Once the Task completes, we assign this ObservableValue to the retrieved result.

That last part… having the result of a Task get saved in an ObservableValue… that’s probably a common need. We can write a convenience function for that.

We then need to pull out two different strings from this data. These are what will be displayed by labels once the data is loaded. We get this by applying a map to the ObservableValue for the DataStructure, resulting in two ObservableValue Strings. But wait… the source ObservableValue DataStructure is nullable. A straight map would produce a nullable String. But we need a non-null String to tell the label what to display. Well, what does a null DataStructure represent? That the data isn’t available yet. What should the labels display in that case? The “Loading…” text! So we null-coalesce the nullable String with the loading text. Since we need to do that multiple times, we can define a reusable operator to do that.

Finally we end up with two public ObservableValue Strings in our ViewModel. The View wires these up to the labels by subscribing and assigning the label text on each update. Remember that ObservableValues give the option to have the subscriber be immediately notified with the current value. That’s exactly what we want! We want the labels to immediately display whatever value is already assigned to those ObservableValues, and then update whenever those values change. This only makes sense for ObservableValues, not for any kind of “stream”, which doesn’t have a “current” value.

This is precisely that “not quite hot, not quite cold” behavior we were looking for. Almost all the pain we experienced with our Rx-based attempt was due to us taking an Observable, which includes endless transformations, many of which are geared specifically toward streams, subscribing to it and writing the most recently emitted item to a UI widget. What is that effectively doing? It’s caching the latest item, which as we saw is exactly what a conversion from an EventStream to an ObservableValue does. Rx Observables don’t have a “current value”, but the labels on the screen certainly do! It turned out that the streams we were constructing were very sensitive to timing in ways we didn’t want, and it was becoming obvious by remembering whatever the latest emitted item was. By using the correct abstraction, ObservableValue, we simply don’t have all these non-applicable transformations like merge, prepend or replay.

Gone is the need to carefully balance an Observable that gets made hot so it can begin its work early, then caches its values to make it cold again, but only caches one to avoid repeating stale data (remember that caching the latest value from a stream is really a conversion from an EventStream to a… ObservableValue!). All along, we just needed to express exactly what a reactive user interface needs: a value, whose changes over time can be reacted to.

Let’s see it:

class DataStructure
{
    String title;
    int version;
    String displayInfo;
    ...
}

extension Task<Result>
{
    public void assignTo(ObservableValue<Result> destination)
    {
        Task.start(async () ->
        {
            destination.value = await self;
        });
    }
}

extension ObservableValue<String?>
{
    private Observable<String> loadedValue()
    {
        String loadingText = "Loading...";

        return self
            .map(valueIn -> valueIn ?? loadingText);
    }
}

class AppScreenViewModel
{
    ...

    public final ObservableValue<String> dataLabelText;
    public final ObservableValue<String> versionLabelText;

    private final ObservableValue<DataStructure?> data = new ObservableValue<DataStructure?>(null);
    ...

    public AppScreenViewModel()
    {
        ...

        Task<DataStructure> fetchDataStructure = getDataStructureRequest(_httpClient)
            .map(response -> parseResponse<DataStructure>(response);

        fetchDataStructure.assignTo(data);
        
        dataLabelText = dataStructure
            .map(dataStructure -> dataStructure?.displayInfo)
            .loadedValue();

        versionLabelText = dataStructure
            .map(dataStructure -> 
 dataStructure?.version.map(toString))
            .loadedValue();
    }
}

...

class AppScreen : View
{
    private TextView dataLabel;
    private TextView versionLabel;

    ...

    private void bindToViewModel(AppScreenViewModel viewModel)
    {
        subscriptions.add(viewModel.dataLabelText
            .subscribeWithInitial(text -> dataLabel.setText(text)));

        subscriptions.add(viewModel.versionLabelText
            .subscribeWithInitial(text -> versionLabel.setText(text)));
    }
}

Voila.

(By the way, I’m only calling ObservableValues “ObservableValue“s to avoid confusing them with Rx Observables. I believe they are what should be properly named Observable, and that’s what I would call them in a codebase that doesn’t import Rx)

This, I believe, achieves the declarative UI style we’re seeking, that avoids the need to manually trigger UI refreshes and ensures rendered data is never stale, and also avoids the pitfalls of Rx that are the result of improperly hiding multiple incompatible abstractions behind a single interface.

Where can you find an implementation of these concepts? Well, I’m working on it (I’m doing the first pass in Swift, and will follow with C#, Kotlin, C++ and maybe Java implementations), and maybe someone reading this will also start working on it. For the time being you can just build pieces you need when you need them. If you’re building UI, you can do what I’ve done several times and write a quick and dirty ObservableValue abstraction with map, flatMap and combine. You can even be lazy and make them all eagerly stored (it probably isn’t that inefficient to just compute them all eagerly, unless your app is really crazy sophisticated). You’ll get a lot of mileage out of that alone.

You can also continue to use Rx, as long as you’re strict about never using Observables to represent DataStreams or Tasks. They can work well enough as EventStreams, and Observables that derive from BehaviorSubjects work reasonably well as ObservableValues (until you need to read the value imperatively). But don’t use Rx as a replacement for asynchrony. Remember that you can always block threads you create, and yes you can create your own threads and block them as you please, and I promise the world isn’t going to end. If you have async/await, remember that it was always the right way to handle DataStreams and Tasks, but don’t try to force it to handle EventStreams or ObservableValues… producer-driven callbacks really are the right tool for that.

Follow these rules and you can even continue to use Rx and never tear your hair out trying to figure out what the “temperature” of your pipelines are.

Leave a Reply

Your email address will not be published. Required fields are marked *