On ReactiveX – Part I

If a tree falls in the forest and no one hears it, does it make a sound?

The ReactiveX libraries have finally answered this age-old philosophical dilemma. If no one is listening for the tree falling, not only does it not make a sound, the tree didn’t even fall. In fact, the wind that knocked the tree down didn’t even blow. If no one’s in the forest, then the forest doesn’t exist at all.

Furthermore, if there are three people in the forest listening, there are three separate sounds that get made. Not only that, there are three trees, each one making a sound. And there are three gusts of wind to knock each one down. There are, in fact, three forests.

ReactiveX is the Copenhagen Interpretation on steroids (or, maybe, just taken to its logical conclusion). We don’t just discard counterfactual definiteness, we take it out back and shoot it. What better way to implement Schrodinger’s Cat in your codebase than this:

final class SchrodingersCat : Observable<boolean>
{
    public SchrodingersCat()
    {
        cat = new Cat("Mittens");
    }

    private void subscribeActual(@NonNull Observer<boolean> observer)
    {
        if(!observed)
        {
            observed = true;

            boolean geigerCounterTripped = new Random().nextInt(2) == 0;
            if(geigerCounterTripped)
                new BluntInstrument().murder(cat);
        }

        observer.onNext(cat.alive());
    }

    private final Cat cat;

    boolean observed = false;
}

In this example, I have to go out of my way to prevent multiple observers from creating multiple cats, each with its own fate. Most Observables aren’t like that.

When you first learn about ReactiveX (Rx, as I will refer to it from now on), it’s pretty cool. The concept of transforming event streams, whose values occur over time, as opposed to collections (Arrays, Dictionarys, etc.), whose values occur over space (memory, or some other storage location), the same way that you transform collections (map, filter, zip, reduce, etc.) immediately struck me as extremely powerful. And, to be sure, it is. This began the Rx Honeymoon. The first thing I knew would benefit massively from these abstractions are the thing I had already learned to write reactively, but without the help of explicit abstractions for that purpose: graphical user interfaces.

But, encouraged by the “guides”, I didn’t stop there. “Everything is an event stream”, they said. They showed me the classic example of executing a web request, parsing its result, and attaching it to some view on the UI. It seems like magic. Just define your API service’s call as an Observable, which is just a map of the Observable for an a general HTTP request (if your platform doesn’t provide for you, you can easily write it bridging a callback interface to an event stream). Then just do some more mapping and you have a text label that displays “loading…” until the data is downloaded, then it automatically switches to display the loaded data:

class DataStructure
{
    String title;
    int version;
    String displayInfo;
    ...
}

class AppScreenViewModel
{
    ...

    public final Observable<String> dataLabelText;

    ...

    public AppScreenViewModel()
    {
        ...

        Observable<HTTPResponse> response = _httpClient.request(
             "https://myapi.com/getdatastructure",
             HTTPMethod::get
         );

        Observable<DataStructure> parsedResponse = response
            .map(response -> new JSONParser().parse<DataStructure>(response.body, new DataStructure());

        Observable<String> loadedText = parsedResponse
             .map(dataStructure -> dataStructure.displayInfo);

        Observable<String> loadingText = Observable.just("Loading...);

        dataLabelText = loadingText
             .merge(loadedText);
    }
}

...

class AppScreen : View
{
    private TextView dataLabel;

    ...

    private void bindToViewModel(AppScreenViewModel viewModel)
    {
        subscription = viewModel.dataLabelText
            .subscribe(text -> dataLabel.setText(text));
    }
}

That’s pretty neat. And you wouldn’t actually write it like this. I just did it like this to illustrate what’s going on. It would more likely look something like this:

    public AppScreenViewModel()
    {
        ...

        dataLabelText = getDataStructureRequest(_httpClient)
            .map(response -> parseResponse<DataStructure>(response, new DataStructure())
            .map(dataStructure -> dataStructure.displayInfo)
            .merge(Observable.just("Loading...");

        ...
    }

And, of course, you’d want to move the low-level HTTP client stuff out of the ViewModel. What you get is an elegant expression of a pipeline of retrieval and processing steps, with end of the pipe plugged into your UI. Pretty neat!

But… hold on. I’m confused. I have my UI subscribe (that is, listen) to a piece of data that, through a chain of processing steps, depends on the response to an HTTP request. I can understand why, once the response comes in, the data makes its way to the text label. But where did I request the data? Where did I tell the system to go ahead and issue the HTTP request, so that eventually all of this will get triggered?

The answer is that it happens automatically by subscribing to this pipeline of events. That is also when it happens. The subscription happens in bindToViewModel. The request will be triggered by that method calling subscribe on the observable string, which triggers subscribes to all the other observables because that’s how the Observable returned by operators like map and merge work.

Okay… that makes sense, I guess. But it’s kind of a waste of time to wait until then to send the request out. We’re ready to start downloading the data as soon as the view-model is constructed. Minor issue, I guess, since in this case these two times are probably a fraction of a second apart.

But now let’s say I also want to send that version number to another text label:

class DataStructure
{
    String title;
    int version;
    String displayInfo;
    ...
}

class AppScreenViewModel
{
    ...

    public final Observable<String> dataLabelText;
    public final Observable<String> versionLabelText;

    ...

    public AppScreenViewModel()
    {
        ...

        Observable<DataStructure> dataStructure = getDataStructureRequest(_httpClient)
            .map(response -> parseResponse<DataStructure>(response, new DataStructure());

        String loadingText = "Loading...";

        dataLabelText = dataStructure
            .map(dataStructure -> dataStructure.displayInfo)
            .merge(Observable.just(loadingText);

        versionLabelText = dataStructure
            .map(dataStructure -> Int(dataStructure.version).toString())
            .merge(Observable.just(loadingText);
    }
}

...

class AppScreen : View
{
    private TextView dataLabel;
    private TextView versionLabel;

    ...

    private void bindToViewModel(AppScreenViewModel viewModel)
    {
        subscriptions.add(viewModel.dataLabelText
            .subscribe(text -> dataLabel.setText(text)));

        subscriptions.add(viewModel.versionLabelText
            .subscribe(text -> versionLabel.setText(text)));
    }
}

I fire up my app, and then notice in my web proxy that the call to my API went out twice. Why did that happen? I didn’t create two of the HTTP request observables. But remember I said that the request gets triggered in subscribe? Well, we can clearly see two subscribes here. They are each to different observables, but both of them are the result of operators that begin with the HTTP request observable. Their subscribe methods call subscribe on the “upstream” observable. Thus, both of the two chains eventually calls subscribe, once each, on the HTTP request observable.

The honeymoon is wearing off.

Obviously this isn’t acceptable. I need to fix it so that only one request gets made. The ReactiveX docs refer to these kinds of observables as cold. They don’t do anything until you subscribe to them, and when you do, they emit the same items for each subscriber. Normally, we might think of “items” as just values. So at worst this just means we’re making copies of our structures. But really, an “item” in this world is any arbitrary code that runs when the value is produced. This is what makes it possible to stuff very nontrivial behavior, like executing an HTTP request, inside an observable. By “producing” the value of the HTTP response, we execute the code that calls the HTTP client. If we produce that value for “n” listeners, we literally have to produce it “n” times, which means we call the service “n” times.

The nontrivial code that happens as part of producing the next value in a stream is what we can call side effects. This is where the hyper-Copenhagen view of reality starts getting complicated (if it wasn’t already). That tree falling sound causes stuff on its own. It chases birds off, and shakes leaves off of branches. Maybe it spooks a deer, causing it to run into a street, which causes a car driving by to swerve into a service pole, knocking it down and cutting off the power to a neighborhood miles away. So now, “listening” to the tree falling sound means being aware of anything that was caused by that sound. Sitting in my living room and having the lights go out now makes me an observer of that sound.

There’s a reason Schrodinger put the cat in a box: to try as best he could to isolate events inside the box from events outside. Real life isn’t so simple. “Optimizing” out the unobserved part of existence requires you to draw a line (or box?) around all the effects of a cause. The Butterfly Effect laughs derisively at the very suggestion.

Not all Observables are like this. Some of them are hot. They emit items completely on their own terms, even if no subscribers are present. By subscribing, you’ll receive the same values at the same time as any other subscribers. If one subscriber subscribes late, they’ll miss any previously emitted items. An example would be an Observable for mouse clicks. Obviously a new subscriber can’t make you click the mouse again, and you can click the mouse before any subscribers show up.

To fix our problem, we need to convert the cold HTTP response observable to a hot one. We want it to emit its value (the HTTP response, which as a side effect will trigger the HTTP request) on its own accord, independent of who subscribes. This will solve both the problem of waiting too long to start the request, and having the request go out twice. To do this, Rx gives us a subclass of Observable called ConnectableObservable. In addition to subscribe, these also have a method connect, which triggers them to start emitting items. I can use the publish operator to turn a cold observable into a connectable hot one. This way, I can start the request immediately, without duplicating it:

        ConnectableObservable<DataStructure> dataStructure = getDataStructureRequest(_httpClient)
            .map(response -> parseResponse<DataStructure>(response, new DataStructure())
            .publish();

        dataStructure.connect();

        String loadingText = "Loading...";

        dataLabelText = dataStructure
            .map(dataStructure -> dataStructure.displayInfo)
            .merge(Observable.just(loadingText);

        versionLabelText = dataStructure
            .map(dataStructure -> Int(dataStructure.version).toString())
            .merge(Observable.just(loadingText);

Now I fire it up again. Only one request goes out! Yay!! But wait… both of my labels still say “Loading…”. What happened? They never updated.

The response observable is now hot: it emits items on its own. Whatever subscribers are there when that item gets emitted are triggered. Any subscribers that show up later miss earlier items. Well, my dev server running in a VM on my laptop here served up that API response in milliseconds, faster than the time between this code running and the View code subscribing to these observables. By the time they subscribed, the response had already been emitted, and the subscribers miss it.

Okay, back to the Rx books. There’s an operator called replay, which will give us a connectable observable that begins emitting as soon as we call connect, but also caches the items that come in. When anyone subscribes, it first powers through any of those cached items, sending them to the new subscriber in rapid succession, to ensure that every subscriber sees the same sequence of items:

        ConnectableObservable<DataStructure> dataStructure = getDataStructureRequest(_httpClient)
            .map(response -> parseResponse<DataStructure>(response, new DataStructure())
            .replay();

        dataStructure.connect();

        String loadingText = "Loading...";

        dataLabelText = dataStructure
            .map(dataStructure -> dataStructure.displayInfo)
            .merge(Observable.just(loadingText);

        versionLabelText = dataStructure
            .map(dataStructure -> Int(dataStructure.version).toString())
            .merge(Observable.just(loadingText);

I fire it up, still see one request go out, but then… I see my labels briefly flash with the loaded text, then go back to “Loading…”. What the fu…

If you think carefully about the last operator, the merge, well if the response comes in before we get there, we’re actually constructing a stream that consists first of the response-derived string, and then the text “Loading…”. So it’s doing what we told it to do. It’s just confusing. The replay operator, as I said, fires off the exact sequence of emitted items, in the order they were originally emitted. That’s what I’m seeing.

But wait… I’m not replaying the merged stream. I’m replaying the upstream event of the HTTP response. Now it’s not even clear to me what that means. I need to think about this… the dataStructure stream is a replay of the underlying stream that makes the request, emits the response, then maps it to the parsed object. That all happens almost instantaneously after I call connect. That one item gets cached, and when anyone subscribes, it loops through and emits the cached items, which is just that one. Then I merge this with a Just stream. What does Just mean, again? Well, that’s a stream that emits just the item given to it whenever you subscribe to it. Each subscriber gets that one item. Okay, and what does merge do? Well, the subscribe method of a merged stream subscribes to both the upstream observables used to build it, so that the subscriber gets triggered by either one’s emitted items. It has to subscribe to both in some order, and I guess it makes sense that it first subscribes to the stream on which merge was called, and then subscribes to the other stream passed in as a parameter.

So what’s happening is by the time I call subscribe on what happens to be a merged stream, it first subscribes to the replay stream, which already has a cached item and therefore immediately emits it to the subscriber. Then it subscribes to the Just stream, which immediately emits the loading text. Hence, I see the loaded text, then the loading text.

If I swapped the operands so that the Just is what I call merge on, and the mapped data structure stream is the parameter, then the order reverses. That’s scary. I didn’t even think to consider that the placement of those two in the call would matter.

Sigh… okay, I need to express that the loading text needs to always come before the loaded text. Instead of using merge, I need to use prepend. That makes sure all the events of the stream I pass in will get emitted before any events from the other stream:

    ConnectableObservable<DataStructure> dataStructure = getDataStructureRequest(_httpClient)
        .map(response -> parseResponse<DataStructure>(response, new DataStructure())
        .replay();

    dataStructure.connect();

    String loadingText = "Loading...";

    dataLabelText = dataStructure
        .map(dataStructure -> dataStructure.displayInfo)
        .prepend(Observable.just(loadingText);

    versionLabelText = dataStructure
        .map(dataStructure -> Int(dataStructure.version).toString())
        .prepend(Observable.just(loadingText);

Great, now the labels look right! But wait… I always see “Loading…” briefly flash on the screen. All the trouble I just dealt with derived from my dev server responding before my view gets created. I shouldn’t ever see “Loading…”, because by the time the labels are being drawn, the loaded text is available.

But the above explanation covers this as well. We’ve constructed a stream where every subscriber will get the “Loading…” item first, even if the loaded text comes immediately after. The prepend operator produces a cold stream. It always emits the items in the provided stream before switching to the one we’re prepending to.

The stream is still too cold. I don’t want the subscribers to always see the full sequence of items. If they come in late, I want them to only see the latest ones. But I don’t want the stream to be entirely hot either. That would mean if the subscribers comes in after the loaded text is emitted, they’ll never receive any events. I need to Goldilocks this stream. I want subscribers to only receive the last item emitted, and none before that. I need to move the replay up to the concatenated stream, and I need to specify that the cached items should never exceed a single one:

    Observable<DataStructure> dataStructure = getDataStructureRequest(_httpClient)
        .map(response -> parseResponse<DataStructure>(response, new DataStructure());

    dataStructure.connect();

    String loadingText = "Loading...";

    dataLabelText = dataStructure
        .map(dataStructure -> dataStructure.displayInfo)
        .prepend(Observable.just(loadingText)
        .replay(1);

    dataLabelText.connect();

    versionLabelText = dataStructure
        .map(dataStructure -> Int(dataStructure.version).toString())
        .prepend(Observable.just(loadingText)
        .replay(1);

    versionLabelText.connect();

Okay, there we go, the flashing is gone. Oh shit! Now two requests are going out again. By moving the replay up to after the stream bifurcated, each stream is subscribing and caching its item, so each one is triggering the HTTP response to get “produced”. Uggg… I have to keep that first replay to “share” the response to each derived stream and ensure each one gets it even if it came in before their own connect calls.

This is all the complexity we have to deal with to handle a simple Y-shaped network of streams to drive two labels on a user interface. Can you imagine building an entire even moderately complex app as an intricate network of streams, and have to worry about how “hot” or “cold” each edge in the stream graph is?

Is the honeymoon over yet?

All this highly divergent behavior is hidden behind a single interface called Observable, which intentionally obscures it from the users of the interface. When an object hands you an Observable to use in some way, you have no idea what kind of observable it is. That makes it difficult or impossible to track down or even understand why a system built out of reactive event streams is behaving the way it is.

This is the point where I throw up my hands and say wait wait wait… why am I trying to say an HTTP request is a stream of events? It’s not a stream of any sort. There’s only one response. How is that a “stream”? What could possibly make me think that’s the appropriate abstraction to use here?

Ohhh, I see… it’s asynchronous. Rx isn’t just a library with a powerful transformable event stream abstraction. It’s the “all purpose spray” of concurrency! Any time I ever need to do anything with a callback, which I put in because I don’t want to block a thread for a long-running process, apparently that’s a cue to turn it into an Observable and then have it spread to everything that gets triggered by that callback, and so on and so on. Great. I graduated from callback hell to Rx hell. I’ll have to consult Dante’s map to see if that moved me up a level or down.

In the next part, I’ll talk about whether any of the Rx stuff is worth salvaging, and why things went so off the rails.

Leave a Reply

Your email address will not be published. Required fields are marked *