On ReactiveX – Part IV

In the previous parts, first we tried to write a simple app with pure Rx concepts and were consumed by demons, then we disentangled the Frankenstein Observable into its genuinely cohesive components, then organized the zoo of operators by associating them with their applicable targets. Now it’s time to put it all together and fix the problems with our Rx app.

Remember, our requirements are simple: we have a screen that needs to download some data from a server, display parts of the downloaded data in two labels, and have those labels display “Loading…” until the data is downloaded. Let’s recall the major headaches that arose when we tried to do this with Rx Observables:

  • We started off with no control over when the download request was triggered, causing it to be triggered slightly too late
  • We had trouble sharing the results of one Observable without inadvertently changing significant behavior we didn’t want to change. Notice that both this and the previous issue arose from the trigger for making the request being implicit
  • We struggled to find a stream pipeline that correctly mixed the concepts of “hot” and “cold” in such a way that our labels displayed “Loading…” only when necessary,

With our new belt of more precise tools, let’s do this right.

The first Observable we created was one to represent the download of data from the server. The root Observable was a general web call Observable that we create with an HTTPRequest, and whose type parameter is an HTTPResponse. So, which of the four abstractions really is this? It’s a Task. Representing this as a “stream” makes no sense because there is no stream… no multiple values. One request, one response. It’s just a piece of work that takes time, that we want to execute asynchronously, and may produce a result or fail.

We then transformed the HTTPResponse using parsers to get an object representing the DataStructure our server responds with. This is a transformation on the Task. This is just some work we need to do to get a Task with the result we actually need. So, we apply transformations to the HTTP Task, until we end up with a Task that gives us the DataStructure.

Then, what do we do with it? Well, multiple things. What matters is at some point we have the DataStructure we need from the server. This DataStructure is a value that, at any time, we either have or don’t have, and we’re interested in when it changes. This is an ObservableValue, particularly of a nullable DataStructure. It starts off null, indicating we haven’t retrieved it yet. Once the Task completes, we assign this ObservableValue to the retrieved result.

That last part… having the result of a Task get saved in an ObservableValue… that’s probably a common need. We can write a convenience function for that.

We then need to pull out two different strings from this data. These are what will be displayed by labels once the data is loaded. We get this by applying a map to the ObservableValue for the DataStructure, resulting in two ObservableValue Strings. But wait… the source ObservableValue DataStructure is nullable. A straight map would produce a nullable String. But we need a non-null String to tell the label what to display. Well, what does a null DataStructure represent? That the data isn’t available yet. What should the labels display in that case? The “Loading…” text! So we null-coalesce the nullable String with the loading text. Since we need to do that multiple times, we can define a reusable operator to do that.

Finally we end up with two public ObservableValue Strings in our ViewModel. The View wires these up to the labels by subscribing and assigning the label text on each update. Remember that ObservableValues give the option to have the subscriber be immediately notified with the current value. That’s exactly what we want! We want the labels to immediately display whatever value is already assigned to those ObservableValues, and then update whenever those values change. This only makes sense for ObservableValues, not for any kind of “stream”, which doesn’t have a “current” value.

This is precisely that “not quite hot, not quite cold” behavior we were looking for. Almost all the pain we experienced with our Rx-based attempt was due to us taking an Observable, which includes endless transformations, many of which are geared specifically toward streams, subscribing to it and writing the most recently emitted item to a UI widget. What is that effectively doing? It’s caching the latest item, which as we saw is exactly what a conversion from an EventStream to an ObservableValue does. Rx Observables don’t have a “current value”, but the labels on the screen certainly do! It turned out that the streams we were constructing were very sensitive to timing in ways we didn’t want, and it was becoming obvious by remembering whatever the latest emitted item was. By using the correct abstraction, ObservableValue, we simply don’t have all these non-applicable transformations like merge, prepend or replay.

Gone is the need to carefully balance an Observable that gets made hot so it can begin its work early, then caches its values to make it cold again, but only caches one to avoid repeating stale data (remember that caching the latest value from a stream is really a conversion from an EventStream to a… ObservableValue!). All along, we just needed to express exactly what a reactive user interface needs: a value, whose changes over time can be reacted to.

Let’s see it:

class DataStructure
{
    String title;
    int version;
    String displayInfo;
    ...
}

extension Task<Result>
{
    public void assignTo(ObservableValue<Result> destination)
    {
        Task.start(async () ->
        {
            destination.value = await self;
        });
    }
}

extension ObservableValue<String?>
{
    private Observable<String> loadedValue()
    {
        String loadingText = "Loading...";

        return self
            .map(valueIn -> valueIn ?? loadingText);
    }
}

class AppScreenViewModel
{
    ...

    public final ObservableValue<String> dataLabelText;
    public final ObservableValue<String> versionLabelText;

    private final ObservableValue<DataStructure?> data = new ObservableValue<DataStructure?>(null);
    ...

    public AppScreenViewModel()
    {
        ...

        Task<DataStructure> fetchDataStructure = getDataStructureRequest(_httpClient)
            .map(response -> parseResponse<DataStructure>(response);

        fetchDataStructure.assignTo(data);
        
        dataLabelText = dataStructure
            .map(dataStructure -> dataStructure?.displayInfo)
            .loadedValue();

        versionLabelText = dataStructure
            .map(dataStructure -> 
 dataStructure?.version.map(toString))
            .loadedValue();
    }
}

...

class AppScreen : View
{
    private TextView dataLabel;
    private TextView versionLabel;

    ...

    private void bindToViewModel(AppScreenViewModel viewModel)
    {
        subscriptions.add(viewModel.dataLabelText
            .subscribeWithInitial(text -> dataLabel.setText(text)));

        subscriptions.add(viewModel.versionLabelText
            .subscribeWithInitial(text -> versionLabel.setText(text)));
    }
}

Voila.

(By the way, I’m only calling ObservableValues “ObservableValue“s to avoid confusing them with Rx Observables. I believe they are what should be properly named Observable, and that’s what I would call them in a codebase that doesn’t import Rx)

This, I believe, achieves the declarative UI style we’re seeking, that avoids the need to manually trigger UI refreshes and ensures rendered data is never stale, and also avoids the pitfalls of Rx that are the result of improperly hiding multiple incompatible abstractions behind a single interface.

Where can you find an implementation of these concepts? Well, I’m working on it (I’m doing the first pass in Swift, and will follow with C#, Kotlin, C++ and maybe Java implementations), and maybe someone reading this will also start working on it. For the time being you can just build pieces you need when you need them. If you’re building UI, you can do what I’ve done several times and write a quick and dirty ObservableValue abstraction with map, flatMap and combine. You can even be lazy and make them all eagerly stored (it probably isn’t that inefficient to just compute them all eagerly, unless your app is really crazy sophisticated). You’ll get a lot of mileage out of that alone.

You can also continue to use Rx, as long as you’re strict about never using Observables to represent DataStreams or Tasks. They can work well enough as EventStreams, and Observables that derive from BehaviorSubjects work reasonably well as ObservableValues (until you need to read the value imperatively). But don’t use Rx as a replacement for asynchrony. Remember that you can always block threads you create, and yes you can create your own threads and block them as you please, and I promise the world isn’t going to end. If you have async/await, remember that it was always the right way to handle DataStreams and Tasks, but don’t try to force it to handle EventStreams or ObservableValues… producer-driven callbacks really are the right tool for that.

Follow these rules and you can even continue to use Rx and never tear your hair out trying to figure out what the “temperature” of your pipelines are.

On ReactiveX – Part III

Introduction

In the last part, we took a knife to Rx’s one-size-fits-all “Observable” abstraction, and carved out four distinct abstractions, each with unique properties that should not be hidden from whoever is using them.

The true value, I believe, of Rx is in its transformation operators… the almost endless zoo of options for how to turn one Observable into another one. The programming style Rx encourages is to build every stream you need through operators, instead of by creating Subjects and publishing to them “imperatively”.

So this begs the question… what happens to this rich (perhaps too rich, as the sheer volume of them is headache-inducing) language of operators when we cleave the Observable into the EventStream, the DataStream, the ObservableValue and the Task?

What happens is operators also get divided, this time into two distinct categories. The first category is those operators that take one of those four abstractions and produces the same type of abstraction. They are, therefore, still transformations. They produce an EventStream from an EventStream, or a DataStream from a DataStream, etc. The second category is those operators that take one of those four abstractions and produces a different type of abstraction. They are not transformers but converters. We can, for example, convert an EventStream to a DataStream.

Transformations

First let’s talk about transformations. After dividing up the abstractions, we now need to divvy up the transformations that were originally available on Observable, by determining which ones apply to which of these more focused abstractions. We’ll find that some still apply in all cases, while others simply don’t make sense in all contexts.

We should first note that, like Observable itself, all four abstractions I identified retain the property of being generics with a single type parameter. Now, let’s consider the simplest transformation, map: a 1-1 conversion transformation that can change that type parameter. This transformation continues to apply to all four abstractions.

We can map an EventStream to an EventStream: this creates a new EventStream, which subscribes to its source EventStream, and for each received event, applies a transformation to produce a new event of a new type, and then publishes it. We can map a DataStream to a DataStream: this creates a new DataStream, where every time we consume one of its values, it first consumes a value of its source DataStream, then applies a transformation and returns the result to us. We can map an ObservableValue: this creates a new ObservableValue whose value is, at any time, the provided transformation of the source ObservableValue (this means it must be read only. We can’t manually set the derived ObservableValue‘s value without breaking this relationship). It therefore updates every time the source ObservableValue updates. We can map a Task to a Task: this is a Task that performs its source Task, then takes the result, transforms it, and returns it as its own result.

We also have the flatMap operator. The name is confusing, and derives from collections: a flatMap of a Collection maps each element to a Collection, then takes the resulting Collection of Collections and flattens it into a single Collection. Really, this is a compound operation that first does a map, then a flatten. The generalizing of this transformation is that it takes an X of X of T, and turns it in an X of T.

The flatten operator, and therefore flatMap, also continues to apply to all of our abstractions. How do we turn an EventStream of EventStreams of T into an EventStream of T? By subscribing to each inner EventStream as it is published by the outer EventStream, and publishing its events to a single EventStream. The resulting EventStream receives all the events from all the inner EventStreams as they become available. How do we turn a DataStream of DataStreams of T into a DataStream of T? When we consume a value, we consume a single DataStream from the outer DataStream, store it, and then supply each value from it on each consumption, until it runs out, then we go consume the next DataStream from the outer DataStream, and repeat. How do we turn an ObservableValue of an ObservableValue of T into an ObservableValue? By making the current value the current value of the current ObservableValue of the outer ObservableValue (which then updates every time either the outer or inner ObservableValue updates). How do we turn a Task that produces a Task that produces a T into a Task that produces a T? We run the outer Task, then take the result inner Task, run it, and return its result.

In Rx land, it was realized that flatten actually has another variation that didn’t come up with ordinary Collections. Each time a new inner Observable is published, we could continue to observe the older inner Observables, or we can stop observing the old one and switch to the new one. This is a slightly different operator called switch, and it leads to the combined operator switchMap. For us, this continues to be the case for only EventStream, because the concept depends on being producer-driven: that streams publish values on their own accord, and we must decide whether to keep listening for them. DataStreams are consumer-driven, so flatMap must get to the end of one inner DataStream before moving to the next. ObservableValue and Task don’t involve multiple values so the concept doesn’t apply there.

Now let’s look at another basic transformation: filter. Does this apply to all the abstractions? No... because filtering is inherently about multiple values: some get through, some don’t. But only two of our four abstractions involve multiple values: EventStream and DataStream. We can therefore meaningfully filter those. But ObservableValue and Task? Filtering makes no sense there, because there’s only one value or result. Any other transformations that inherently involve multiple values (filter is just one, others include buffer or accumulate) therefore only apply to EventStream and DataStream, but not ObservableValue or Task.

Another basic operator is combining: taking multiple abstractions and combining them into a single one. If we have an X of T1 and an X of T2, we may be able to combine them into a single X of a combined T1 and T2 (i.e. a (T1, T2) tuple). Or, if we have a Collection of Xs of Ts, we can combine them into a single X of a Collection of Ts, or possibly a single X of T. Can this apply to all four abstractions? Yes, but we’ll see that for abstractions that involve multiple values, there are multiple ways to “combine” their values, while for the ones that involve on a single value, there’s only one way to “combine” their values.

That means for ObservableValue and Task, there’s one simple combine transformation. A combined ObservableValue is one whose value is the tuple/collection made up of its sources’ values, and therefore it changes when any one of its source values changes. A combined Task is one that runs each one of its source Tasks in parallel, waits for them all to finish, then returns the combined results of all as its own result (notice that, since Task is fundamentally concerned with execution order, this becomes a key feature of one of its transformations).

With EventStream and DataStream, there are multiple ways in which we can combine their values. With EventStream, we can wait for all sources to publish one value, at which point we publish the first combined value, we store these combined values, then each time any source value changes, we update only the published one, keeping all the rest the same, and publish that new combined value. This is the combineLatest operator: each published event represents the most recently published events from each source stream. We can alternatively wait for each source to publish once, at which point we publish the combination, then we don’t save any values, and again wait for all sources to publish again before combining and publishing again. This is the zip operator.

But combineLatest doesn’t make sense for DataStream though, because it based on when each source stream publishes. The “latest” of combineLatest refers to the timing of the source stream events. Since DataStreams are consumer-driven, there is no timing. The DataStream is simply asked to produce a value by a consumer. Therefore, there’s only combine, where when a consumer consumes a value, the combined DataStream then consumes a value from each of its sources, combines then and returns it. This is the zip operator, which continues to apply to DataStream.

Both EventStream and DataStream also ways to combine multiple streams into a single stream of the same type. With EventStream, this is simply the stream that subscribes to multiple sources and publishes when it receives a value from any of them. This is the merge operator. The order in which a merged EventStream publishes is dictated by the order it receives events from its source streams. We can do something similar with DataStream, but since DataStreams are consumer-driven, the transformation has to decide which source to consume from. A merge would be for the DataStream to first consume all the values of the first source stream, then all the values of second one, and so on (thus making it equivalent to a merge on a Collection… we could also call this concatenate to avoid ambiguity). We can also do a roundRobin: each time we consume a value the combined stream consumes one from a particular source, then one from the next one, and so on, wrapping back around after it reaches the end. There are all sorts of ways we can decide how to pick the order of consumption, and a custom algorithm can probably be plugged in as a Strategy to a transformation.

Somewhat surprisingly, I believe that covers it for ObservableValue and Task, with one exception (see below): map, flatten and combine are really the only transformations we can meaningfully do with them, because all other transformations involve either timing or value streams. Most of the remaining transformations from Rx we haven’t talked about will still apply to both EventStream and DataStream, but there are some important ones that only apply to one or the other. Any transformations that involve order apply only to DataStream, for example append or prepend. Any transformations that are driven by timing of the source streams apply only to EventStream, for example debounce or delay. And some transformations are really not transformations but conversions.

The exception I mentioned is for ObservableValue. EventStreams are “hot”, and their transformations are “eager”, and it never makes sense for them to not be (in the realist interpretation of events and “observing”). Derived ObservableValues, however, can be “eager” or “lazy”, and both are perfectly compatible with the abstraction. If we produce one ObservableValue from a sequence of transformations (say, maps and combines) on other ObservableValues, then we can choose to either perform those transformations every time someone reads the value, or we can choose to store the value, and simply serve it up when asked.

I believe the best way to implement this is to have derived ObservableValues be lazy by default: their values get computed from their sources on each read. This also means when there are subscribers to updates, they must subscribe to the source values’ updates, then apply the transformations each time new values are received by the sources. But sometimes this isn’t the performance we want. We might need one of those derived values to be fast and cheap to read. To do that, we can provide the cache operator. This takes an ObservableValue, and creates a new one that stores its value directly. This also requires that it eagerly subscribe to its source value’s updates and use them to update the stored value accordingly. There is also an issue of thread safety: what if we want to read a cached ObservableValue from one thread but it’s being written to on another thread? To handle this we can allow the cache operator to specify how (using a Scheduler) the stored value is updated. These issues of caching and thread safety are unique to observable values.

Converters

Now let’s talk about how we can turn one of these four abstractions into another one of of the four. In total that would be 12 possible converters, assuming that there’s a useful or sensible way to convert each abstraction into each other one.

Let’s start with EventStream as the source.

What does it mean to convert an EventStream into a DataStream? This means we’re talking a producer driven stream and converting it to a consumer driven stream. Remember the key distinction is that EventStreams are defined by timing: the events happen when they do, and subscribers are either subscribed at the time or they miss them. DataStreams are defined by order: the data are returned in a specific sequence, and it’s not possible to “miss” one (you can skip one but that’s a conscious choice of the consumer). Thus, turning an EventStream into a DataStream is fundamentally about storing events as they are received until they are later consumed, ensuring that none can be missed. It is, therefore, buffering. For this reason, this conversion operator is called buffer. It internally builds up a Collection of events received from the EventStream, and when a consumer consumes a value, the first element in the collection is returned immediately. If the Collection is empty, the consumer will be blocked until the next event is received.

What does it means to convert an EventStream to an ObservableValue? It would mean we’re storing the latest event emitted by the stream, so we can query what it is at any time. We call this converter cacheLatest. Note that the latest event must be cached, or else we wouldn’t be able to read it on demand. That’s fundamentally what this converter is doing: taking transient events that are gone right after they occur, and making them persistent values that can be queried as needed. This can be combined with other transformations on EventStream to produce some useful derived converters. For example, if we apply the accumulate operator to the EventStream, then use cacheLatest to produce an ObservableValue, the result is an accumulateTo operator, which stores a running accumulation (perhaps a sum) of incoming values over time.

What does it mean to convert an EventStream to a Task? Well, basically it would mean we create a Task that waits for one or more events to be published, then returns them as the result. But as we will see soon, it makes more sense to create “wait for the next value” Tasks out of DataStreams, and we can already convert EventStreams to DataStreams with buffer. Therefore, this converter would really be a compound conversion of first buffering and then fetching the next value. We can certainly write it as a convenience function, but under the hood it’s just composing other converters.

Now let’s move to DataStream as the source.

What does it mean to convert a DataStream to an EventStream? Well, an EventStream publishes its own events, but a DataStream only returns values when a consumer consumes them. Thus, turning a DataStream into an EventStream involves setting up a consumer to immediately start consuming values, and publish them as soon as they are available. The result is that multiple observers can now listen for those values as they get published, with the caveat that they’ll miss any values if they don’t subscribe at the right time. We can call this conversion broadcast.

What does it mean to convert a DataStream to an ObservableValue? Nothing useful or meaningful, as far as I can tell. Remember meaning of converting an EventStream to an ObservableValue was to cache the latest value. That’s a reference to timing. But timing in a DataStream is controlled by the consumer, so all that could mean is a consumer powers through all the values and saves them to an ObservableValue. The result is a rapidly changing value that then gets stuck on the last value in the DataStream. That doesn’t appear to be a valid concept.

What does it mean to convert a DataStream to a Task? Simple: read the next value! In fact, in .NET, where Task is the return value of all async functions, the return value of the async read method would then have to be a Task. There can, of course, be other related functions to read more than one value. We can also connect an input DataStream to an output DataStream (which, remember, is a valid abstraction but not one carved out from Observable, which only represents sources and not sinks), which results in a Task whose work is to consume values from the input and send them to the output.

Now let’s move to ObservableValue as the source.

What does it mean to convert an ObservableValue to an EventStream? Simple: publish the updates! Now, part of an ObservableValue‘s features is being able to subscribe to updates. How is this related to publishing updates? When we subscribe to an ObservableValue, the subscription is lazy (for derived ObservableValues, the transformations are applied to source values as they arrive), and we have the option of notifying our subscriber immediately with the current value. But when we produce an EventStream from an ObservableValue, remember EventStreams are always hot! It must eagerly subscribe to the ObservableValue and then publish each update it receives. This is significant for derived lazy ObservableValues, because as long as someone holds onto an EventStream produced from it, it has an active subscription and therefore its value is being calculated, which it wouldn’t be if no one was subscribed to it. We can call this converter publishUpdates: it is basically ensuring that updates are eagerly computed and broadcasted so that anyone can observe them as any other EventStream.

What does it mean to convert an ObservableValue to a DataStream? Nothing useful or meaningful that I can think of. At best it would be a stream that lets us read the updates, but that’s just publishUpdates followed by buffer.

What does it mean to convert an ObservableValue to a Task? Again, I can’t think of anything useful or meaningful, that wouldn’t be a composition of other converters.

Now let’s move to Task as the source.

Tasks don’t convert to any of the other abstractions in any meaningful way, because they have only a single return value. Even ObservableValues, which fundamentally represent single values, still have an associated stream of updates. Tasks don’t even have this. For this reason, we can’t derive any kind of meaningful stream from a Task, which means there’s also nothing to observe.

The converters are summarized in the following table (the row is the input, the column is the output):

EventStreamDataStreamObservableValueTask
EventStreamN/AbuffercacheLatestnone
DataStreambroadcastN/Anoneread
ObservableValuepublishUpdatesnoneN/Anone
TasknonenonenoneN/A

There are, in total, only five valid converters.

Conclusion

After separating out the abstractions, we find that the humongous zoo of operators attached to Observable is tidied up into more focused groups of transformations on each of the four abstractions, plus (where applicable) ways to convert from one to the other. What this reveals is that places where an Rx-driven app creates deep confusion over “hotness/coldness” and side effects are areas where an Observable really represents one of these four abstractions but it combined with a transformation operation that does not apply to that abstraction. For example, one true event stream (say, of mouse clicks or other user gestures) appended to another one makes no sense. Nor does trying to merge two observable values into a stream based on which one changes first.

In the final part, we’ll revisit the example app from the first part, rewrite it with our new abstractions and escape the clutches of Rx Hell.

On ReactiveX – Part II

In the last part, we explored the bizarre world of extreme observer-dependence that gets created in an ReactiveX (Rx)-driven app, and how that world rapidly descends into hell, especially when it is applied as a blanket solution to every problem.

Is the correct reaction to all of this to say “Screw Rx” and be done with it? Well, not entirely. The part where we try to cram every shaped peg into a square hole, we should absolutely say “to hell” with that. Whenever you see library tutorials say any variant of “everything is an X”, you should back away slowly, clutching whatever instrument of self-defense you carry. The only time that statement is true is if X = thing. Yes, everything is a thing… and that’s not very insightful, is it? The reason “everything is an X” with some more specific X seems profound is because it’s plainly false, and you have to imagine some massive change in perception for it to possibly be true.

Rx’s cult of personality cut its way through the Android world a few years ago, and now most of its victims have sobered up and moved on. In what is a quintessentially Apple move, Apple invented their own, completely proprietary and incompatible version of Rx, called Combine, a couple of years ago, and correspondingly the “everything is a stream” drug is making its rounds through the iOS world. It, too, will come to pass. A large part of what caused RxJava to wane is Kotlin coroutines, and with finally Swift gaining async/await, Combine will subside as well. Why do these “async” language features replace Rx? Because Rx was touted as the blanket solution to concurrency.

Everything is not an event stream, or an observable, period. Some things are. Additionally, the Rx Observable is a concept with far too much attached to it. It is trying to be so many things at once, owing to the fact it’s trying to live up to the “everything is a me” expectation, which will only result in Observable becoming a synonym for Any, except instead of it doing what the most general, highest category should do (namely, nothing), it endeavors instead to do the opposite: everything. It’s a God object in the making. That’s why it ends up everywhere in your code, and gradually erodes all the information a robust type system is supposed to communicate.

But is an event stream, endeavoring only to be an event stream, with higher-order quasi-functional transformations, a useful abstraction? I believe it’s a massively useful one. I still use it for user interfaces, but I reluctantly do so with Rx’s version of one, mostly because it’s the best one available.

The biggest problem with Rx is that its central abstraction is really several different abstractions, all crammed together. After thinking about this for a while, I have identified four distinct concepts that have been merged under the umbrella of the Observable interface. By disentangling these from each other, we can start to rebuild more focused, well-formed libraries that aren’t infected with scope creep.

These are the four abstractions I have identified:

  • Event streams
  • Data streams
  • Observable values
  • Tasks

Let’s talk about each one, what they are (and just as importantly, what they aren’t), and how they are similar to and different from Rx’s Observable.

Event Streams

Let us return from the mind-boggling solipsism of extreme Copenhagen interpretation, where the world around us is brought into being by observing it, and return to classical realism, where objective reality exists independent of observation. Observation simply tells us what is out there. An event stream is literally a stream of events: things that occur in time. Observing is utterly passive. It does not, in any way, change what events occur. It merely signs one up to be notified when they do.

The Rx Observable practically commands the Copenhagen outlook by making the abstract method, to be overridden by the various subclasses returned by operators, subscribe. It is what, exactly, subscribing (a synonym for observing) means that varies with different types of streams. This is where the trouble starts. It sets us up to have subscribe be what controls publish.

A sane approach to an event stream is for the subscribe method to be final. Subscribing is what it is: it just adds a callback to the list of callbacks to be triggered when an event is published. It should not alter what is published. The interesting behavior should occur exclusively in the constructor of a stream.

Let us recall the original purpose of the Observer Pattern. The primary purpose is not really to allow one-to-many communication. That’s a corollary of its main purpose. The main purpose is to decouple the endpoints of communication, specifically to allow one object to send messages to another object without ever knowing about that other object, not even the interfaces it implements.

Well, this is no different than any delegation pattern. I can define a delegate in class A, then have class B implement that delegate, allowing A to communicate with B without knowing about B. So what is it, specifically, about the Observer pattern that loosens the coupling even more than this?

This answer is that the communication is strictly one way. If an A posts an event, and B happens to be listening, B will receive it, but cannot (without going through some other interface that A exposes) send anything back to Anot even a return value. Essentially, all the methods in an observer interface must have void returns. This is what makes one-to-many broadcasting a trivial upgrade to the pattern, and why you typically get it for free. Broadcasting with return values wouldn’t make sense.

The one-way nature of the message flow creates an objective distinction between the publisher (or sender) and the subscriber (or receiver). The intermediary that moves the messages around is the channel, or broker. This is distinct from, say, the Mediator Pattern, where the two ends of communication are symmetric. An important consequence of the asymmetry of observers is that the presence of subscribers cannot directly influence the publisher. In fact, the publisher in your typical Observer pattern implementation can’t even query who is a subscriber, or even how many subscribers there are.

A mediator is like your lawyer talking to the police. An observer is like someone attending a public speech you give, where the “channel” is the air carrying the sound of your voice. What you say through your lawyer depends on what questions the police ask you. But the speech you give doesn’t depend on who’s in the audience. The speaker is therefore decoupled from his audience to a greater degree than you are decoupled from the police questioning you.

By moving the publishing behavior into subscribe, Rx is majorly messing with this concept. It muddles the distinction between publisher/sender and subscriber/receiver, by allowing the subscribe/receive end of the chain to significantly alter what the publisher/sender side does. It’s this stretching of the word “observe” to mean something closer to “discuss” that can cause confusion like “why did that web request get sent five times?”. It’s because what we’re calling “observing a response event” is more like “requesting the response and waiting for it to arrive”, which is a two-way communication.

We should view event streams as a higher abstraction level for the Observer Pattern. An EventStream is just a wrapper around a channel, that encapsulates publishing and defines transformation operators that produce new EventStreams. The publishing behavior of a derived stream is set up at construction of the stream. The subscribe method is final. Its meaning never changes. It simply forwards a subscribe call to the underlying channel.

Event streams are always “hot”. If the events occur, they are published, if not, they aren’t. The transformation operations are eager, not lazy. The transform in map is evaluated on each event as soon as the event is published, independent of subscribers. This expresses the realism of this paradigm: those mapped events happen, period. Subscribing doesn’t make them happen, it just tells us about them. The way we handle whether derived streams continue to publish their derived events is by holding onto the stream. If a derived stream exists, it is creating and publishing derived events. If we want the derived events to stop firing, we don’t throw away subscriptions, we throw away the stream itself.

There’s no problem of duplication here. The subscribing is one-to-many, but the construction of the events, the only place where any kind of side effects can occur, is tied the construction of derived streams, which only happens once. One stream = one instance of each event. The other side of that coin is that missed events are missed, period. If you want any kind of caching behavior, that’s not an event stream. It’s something else.

I think we’ll also find that by separating out the other concepts we’ll get to next, the need to ever create event streams that have any side effects is reduced to essentially zero.

Rx streams have behavior for handling the stream “completing”, and handling exceptions that get thrown during construction of an item to be emitted. I have gone back and forth over whether it makes sense for a strict event stream to have a notion of “completing”. I lean more toward thinking it doesn’t, and that “completion” applies strictly to the next concept we’ll talk about.

What definitely does not make sense for event streams is failures. Event streams themselves can’t “fail”. Events happen or they don’t. If some exception gets thrown by a publisher, it’s a problem for the publisher, that’s either going to be trapped by the publisher, will kill the publisher, or kill the process. Having it propagate to subscribers, and especially having it (by design) terminate the whole stream doesn’t make sense.

Data Streams

The next concept is a data stream. How are “data” streams different from “event” streams? Isn’t an event just some data? Well, an event holds data, but the event is the occurrence itself. With data streams, the items are not things that occur at a specific time. They may become available at a specific time, but that time is otherwise meaningless. The only significance of the arrival time of a datum is that we have to wait for it.

More importantly, in a stream of data, every datum matters. It’s really the order, not the timing, of the items that’s important. It’s critical that someone reading the data stream receive every element in the correct order. If a reader wants to skip some elements, that’s his business. But it wouldn’t make sense for a reader to miss elements and not know it.

We subscribe to an event stream, but we consume a data stream. Subscribing is passive. It has no impact on the events in the stream. Consuming is active. It is what drives the stream forward. The “next” event in a stream is emitted whenever it occurs, independent of who is subscribed. The “next” event of a data stream is emitted when the consumer decides to consume it. In both cases, once an element is emitted, it is never re-emitted.

Put succinctly, an event stream is producer-driven, and a data stream is consumer-driven. An event stream is a push stream, and a data stream is a pull stream.

This means a data stream cannot be one-to-many. An event stream can have arbitrarily many subscribers, only because subscribing is passive; entirely invisible to the publisher. But a data stream cannot have multiple simultaneous consumers. If we passed a data stream to multiple consumers who tried to read at the same time, they would step on each others’ toes. One would consume a datum and cause the other one to miss it.

To clarify, we’re talking about a specific data stream we call an input stream. It produces values that a consumer consumes. The other type of data stream is an output stream, which is a consumer itself, rather than a producer. Output streams are a separate concept not related to Rx Observables, because Observables are suppliers, not consumers (consumers in Rx are called Subscribers).

Most languages already have input and output stream classes, but they aren’t generic. Their element type is always bytes. We can define a generic one like this:

interface InputStream<Element>
{
    bool hasMore();

    Element read();

    UInt skip(UInt count);
}

This time it’s a pure interface. There’s no default behavior. Different types of streams have to define what read means.

Data streams can be transformed in ways similar to event streams. But since the “active” part of a data stream is the reading, it is here that a derived stream will interact with its source stream. This will look more like how Rx Observable implements operators. The read method will be abstract, and each operator, like map and filter, will implement read by calling read on the source stream and applying the transform. In this case, the operators are lazy. The transform is not applied to a datum until a consumer consumes the mapped stream.

The obvious difference between this an Rx Observables is that this is a pull, rather than push, interface. The read method doesn’t take a callback, it returns a result. This is exactly what we want for a stream where the next value is produced by the consumer requesting it. A data stream is inherently a pull paradigm. A push-style interface just obscures this. Typical needs with data streams, for example reading “n” values, then switching to do other stuff and then returning to read some more, become incredibly convoluted with an interface designed for a stream where the producer drives the flow.

A pull interface requires that if the next datum isn’t available yet, the thread must block. This is the horror that causes people to turn everything into callbacks: so they never block threads. The phobia of blocking threads (which is really a phobia of creating your own threads that can be freely blocked without freezing the UI or starving a thread pool) is a topic for another day. For the sake of argument I’ll accept that it’s horrible and we must do everything to avoid it.

The proper solution to the problem of long-running methods with return values that don’t block threads is not callbacks. Callback hell is the price we pay for ever thinking it was, and Rx hell is really a slight variation of callback hell with even worse problems layered on top. The proper solution is coroutines, specifically async/await.

This is, of course, exactly how we’d do it today in .NET, or any other language that has coroutines. If you’re stuck with Java, frankly I think you should just let the thread block, and make sure you do the processing on a thread you created (not the UI thread). That is, after all, exactly how Java’s InputStream works. If you are really insistent on not blocking, use a Future. That allows consuming with a callback, but it at least communicates in some way that you only expect the callback to be called once. That means you get a Future each time you read a chunk of the stream. If that seems ugly/ridiculous to you, then just block the damn thread!

Data streams definitely have a notion of “completing”. Their interface needs to be able to tell a consumer that there’s nothing left to consume. How does it handle errors? Well, since the interface is synchronous, an exception thrown by a transformation will propagate to the consumer. It’s his business to trap it and decide how to proceed. It should only affect that one datum. It should be possible to continue reading after that. If an intermediate derived stream doesn’t deal with an exception thrown by a source stream, it will propagate through until it gets to an outer stream that handles it, or all the way out to the consumer. This is another reason why a synchronous interface is appropriate. It is exactly what try-catch blocks do. Callback interfaces require you to essentially try-catch on every step, even if a step actually doesn’t care about (and cannot handle) an error and simply forwards it. You know you hate all that boilerplate. Is it really worth all of that just to not block a thread?

(If I was told I simply cannot block threads I’d port the project to Kotlin before trying to process data streams with callbacks)

Observable Values

Rx named its central abstraction Observable. This made me think if I create an Observable<String>, it’s just like a regular String, except I can also subscribe to be notified when it changes. But that’s not at all what it is. It’s a stream, and streams aren’t values. They emit values, but they aren’t values themselves. What’s the difference, exactly? Well, if I had what was literally an observable String, I could read it, and get a String. But you can’t “read” an event stream. An event stream doesn’t have a “current value”. It might have a most recently emitted item, but those are, conceptually, completely different.

Unfortunately, in its endeavor toward “everything is me”, Rx provides an implementation of Observable whose exact purpose is to try to cram these two orthogonal concepts together: the BehaviorSubject. It is a literal observable value. It can be read to get its current value. It can be subscribed to, to get notified whenever the value changes. It can be written to, which triggers the subscribers.

But since it implements Observable, I can pass it along to anything that expects an Observable, thereby forgetting that its really a BehaviorSubject. This is where it advertises itself as a stream. You might think: well it is a stream, or rather changes to the value are a stream. And that is true. But that’s not what you’re subscribing to when you subscribe to a BehaviorSubject. Subscribing to changes would mean you don’t get notified until the next time the value gets updated. If it never changes, the subscriber would never get called. But subscribers to a BehaviorSubject always get called immediately with the current value. If all you know if you’ve got an Observable, you’ll have no idea if this will happen or not.

Once you’ve upcast to an Observable, you lose the ability to read the current value. To preserve this, you’ll have to expose it as a BehaviorSubject. The problem then becomes that this exposes both reading and writing. What if you want to only expose reading the current value, but not writing? There’s no way to do this.

The biggest problem is that operators on a BehaviorSubject produce the same Observable types that those operators always do, which again loses the ability to read the current value. You end up with a derived Observable where the subscriber always gets called immediately (unless you drop or filter or do something else to prevent this), so it certainly always has a current value, you just can’t read it. This has forced me to do very stupid stuff like this:

BehaviorSubject<Int> someInt = new BehaviorSubject<Int>(5);

Observable<String> stringifiedInt = someInt
    .map(value -> value.toString());

...

String currentStringifiedInt = null;

Disposable subscription = stringifiedInt
    .subscribe(value ->
    {
        currentStringifiedInt = value;
    });

subscription.dispose();

System.out.print("Current value: " + currentStringifiedInt);
...

This is ugly, verbose, obtuse and unsafe. I have to subscribe just to trigger the callback to produce the current value for me, then immediately close the subscription because I don’t want that callback getting called again. I have to rely on the fact that a BehaviorSubject-derived observable will emit items immediately (synchronously), to ensure currentStringifiedInt gets populated before I use it. If I turn the derived observable back into a BehaviorSubject (which basically subscribes internally and sticks each updated value into the new BehaviorSubject), I can read the current value, but I can write to it myself, thereby breaking the relationship between the derived observable and the source BehaviorSubject.

The fundamental problem is that observable values and event streams aren’t the same thing. We need a separate type for this. Specifically, we need two interfaces: one for read-only observable values, and one for read-write observable values. This is where we’re going to see the type of subscribe-driven lazy evaluation that we see inside of Rx Observables. Derived observables are read-only. Reading them triggers whatever cascade of processing and upstream reading is necessary to produce the value. When we subscribe, that is where it will subscribe to its source observables, inducing them to compute their values when necessary (when those values update) to send them downstream.

Furthermore, the subscribe method on our Observable should explicitly ask whether the subscriber wants to be immediately notified with the current value (by requiring a boolean parameter). Since we have a separate abstraction for observable values, we know there is always a current value, so this question always makes sense.

Since the default is lazy, and therefore expensive and repetitious evaluation, we’ll need an operator specifically to store a derived observable in memory for quick evaluation. Is this comparable to turning a cold (lazy) Rx Observable into a hot (eager) one? No, because the thing you subscribe to with observable values, the changes, are always hot. They happen, and you miss them if you aren’t subscribed. Caching is purely a matter of efficiency, trading computing time for computing space (storage). It has no impact whatsoever on when updates get published.

Caching will affect whether transformations to produce a value are run repeatedly, but only for synchronous reads (multiple subscribers won’t cause repeated calculations). The major difference is that we can eliminate repeated side-effects from double-calculating a value without changing how or when its updates are published. What subscribers see is totally separate from whether an observable value is cached, unlike in Rx where “sharing” an Observable changes what subscribers see (it causes them to miss what they otherwise would have received).

A single Observable represents a single value. Multiple subscribers means multiple people are interested in one value. There’s no issue of “making sure all observers see the same sequence”. If a late subscriber comes in, he’ll either request the current value, whatever it is, or just request to be notified of later changes. The changes are true events (they happen, or they don’t, and if they do they happen at a specific time). We’d never need to duplicate calculations to make multiple subscribers see stale updates.

Furthermore, we communicate more clearly what, if any, “side effects” should be happening inside a transformation. They should be limited to whatever is necessary to calculate the value. If we have a derived value that requires an HTTP request to calculate it, this request will go out either when the source value changes, requiring a re-evaluation, or it will happen when someone tries to read the value… unless we cache it, which ensures the request always goes out as soon as it can. It is multiple synchronous reads that would, for non-cached values, trigger multiple requests, not multiple subscribers. This makes sense. If we’ve specified we don’t want to store the value, we’re saying each time we want to query the value we need to do the work of computing it.

Derived (and therefore read-only) observable values, which can both be subscribed to and read synchronously, is the most important missing piece in Rx. It’s so important I’ve gone through the trouble multiple times to build rudimentary versions of it in some of my apps.

“Completion” obvious makes no sense for observable values. They never stop existing. Errors should probably never happen in transformations. If a runtime exception sneaks through, it’s going to break the observable. It will need to be rethrown every time anyone tries to read the value (and what about subscribing to updates?). The possibility of failure stretches the concept of a value whose changes can be observed past, in my opinion, its range of valid interpretation. You can, of course, define a value that has two variations of success and failure (aka a Result), but the possibility of failure is baked into the value itself, not its observability.

Tasks

The final abstraction is tasks. Tasks are just asynchronous function invocations. They are started, and they do or do not produce a result. This is fundamentally different from any kind of “stream” because tasks only produce one result. They may also fail, in which case they produce one exception. The central focus of tasks is not so much on the value they produce but on the process of producing it. The fact the process is nontrivial and long-running is the only reason you’d pick a task over a regular function to begin with. As such, tasks expose an interface to start, pause/resume and cancel. Tasks are, in this way, state machines.

Unlike any of the other abstractions, tasks really do have distinct steps for starting and finishing. This is what ConnectableObservable is trying to capture with its addition (or rather, separation from subscribe) of connect. The request and the response are always distinct. Furthermore, once a task is started, it can’t be “restarted”. Multiple people waiting on its response doesn’t trigger the work to happen multiple times. The task produces its result once, and stores it as long as it hangs around in case anyone else asks for it.

Since the focus here is on the process, not the result, task composition looks fundamentally different from stream composition. Stream composition, including pipelines, focuses on the events or values flowing through the network. While task composition deals with the results, it does so primarily in its concern with dependency, which is about the one thing task composition is really concerned with: exactly when the various subtasks can be started, relative to when other tasks start or finish. Task composition is concerned with whether tasks can be done in parallel or serially. This is even a concern for tasks that don’t produce results.

Since tasks can fail, they also need to deal with error propagation. An error in a task means an error occurring somewhere in the process of running the task: moving it from start to finish. It’s the finishing that is sabotaged by an error, not the starting. We expect starting a task to always succeed. It’s the finishing that might never happen due to an error. This is represented by an additional state for failed. This is why it is not starting a task that would throw an exception, but waiting on its result. It makes sense that in a composed task, if a subtask fails, the outer task may fail. The outer task either expects and handles the error by trapping it, or it doesn’t, in which case it propagates out and becomes a failure of the outer task.

This propagation outward of errors, through steps that simply ignore those errors (and therefore, ideally, should contain absolutely no boilerplate code for simply passing an error through), is similar to data streams, and it therefore demands a synchronous interface. This is a little more tricky though because tasks are literally concerned with composing asynchrony. Even if we’re totally okay with blocking threads, what if we want subtasks to start simultaneously? Well, that’s what separating starting from waiting on the result lets us do. We only need to block when we need the result. That can be where exceptions are thrown, and they’ll automatically propagate through steps that don’t deal with them, which is exactly what we want. This separates when an exception is thrown from when an exception is (potentially) caught, and therefore requires tasks to cache exceptions just like they do their result.

We can, of course, avoid blocking any threads by using coroutines. That’s exactly what the .NET Tasks do. If you’re in a language that doesn’t have coroutines, I have the same advice I have for data streams: just block the damn threads. You’ll tear your hair out with the handleResult/handleError pyramids of callback doom, where most of your handleError callbacks are just calling the outer handleError to pass errors through.

What’s missing in the Task APIs I’ve seen is functional transformations like what we have on the other abstractions. This is probably because the need is much less. It’s not hard at all to do what it is essentially a map on a Task:

async Task<MappedResult> mapATask()
{
    Task<Result> sourceTask = getSourceTask();
    Function<Result, MappedResult> transform = getTransform();

    return transform(await sourceTask);
}

But still, we can eliminate some of that boilerplate with some nice extension methods:

static async Task<MappedResult> Map<Result, MappedResult>(this Task<Result> ThisTask, Function<Result, MappedResult> transform)
{
    return transform(await ThisTask);
}

...

Task<Result> someTask = getTask();

await someTask
  .map(someTransform)
  .map(someOtherTransform);

Conclusion

By separating out these four somewhat similar but ultimately distinct concepts, we’ll find that the “hot” vs. “cold” distinction is expressed by choosing the right abstraction, and this is exposed to the clients, not hidden in the implementation details. Furthermore, the implication of side effects is easier to understand and address. We make a distinction of how “active” or “passive” different actions are. Observing an event is totally passive, and cannot itself incur side effects. Constructing a derived event stream is not passive, it entails the creation of new events. Consuming a value in a data stream is also not passive. Notice that broadcasting requires passivity. The only one-to-many operations available, once we distinguish the various abstractions, are observing an event stream and observing changes to an observable value. The former alone cannot incur side effects itself, and the latter can only occur side effects when going from no observers to more than none, and thus is independent of the multiplicity of observers. We have, in this way, eliminated the possibility of accidentally duplicating effort in the almost trivial manner that it is possible in Rx.

In the next part, we’ll talk about those transformation operators, and what they look like after separating the abstractions.

On ReactiveX – Part I

If a tree falls in the forest and no one hears it, does it make a sound?

The ReactiveX libraries have finally answered this age-old philosophical dilemma. If no one is listening for the tree falling, not only does it not make a sound, the tree didn’t even fall. In fact, the wind that knocked the tree down didn’t even blow. If no one’s in the forest, then the forest doesn’t exist at all.

Furthermore, if there are three people in the forest listening, there are three separate sounds that get made. Not only that, there are three trees, each one making a sound. And there are three gusts of wind to knock each one down. There are, in fact, three forests.

ReactiveX is the Copenhagen Interpretation on steroids (or, maybe, just taken to its logical conclusion). We don’t just discard counterfactual definiteness, we take it out back and shoot it. What better way to implement Schrodinger’s Cat in your codebase than this:

final class SchrodingersCat : Observable<boolean>
{
    public SchrodingersCat()
    {
        cat = new Cat("Mittens");
    }

    private void subscribeActual(@NonNull Observer<boolean> observer)
    {
        if(!observed)
        {
            observed = true;

            boolean geigerCounterTripped = new Random().nextInt(2) == 0;
            if(geigerCounterTripped)
                new BluntInstrument().murder(cat);
        }

        observer.onNext(cat.alive());
    }

    private final Cat cat;

    boolean observed = false;
}

In this example, I have to go out of my way to prevent multiple observers from creating multiple cats, each with its own fate. Most Observables aren’t like that.

When you first learn about ReactiveX (Rx, as I will refer to it from now on), it’s pretty cool. The concept of transforming event streams, whose values occur over time, as opposed to collections (Arrays, Dictionarys, etc.), whose values occur over space (memory, or some other storage location), the same way that you transform collections (map, filter, zip, reduce, etc.) immediately struck me as extremely powerful. And, to be sure, it is. This began the Rx Honeymoon. The first thing I knew would benefit massively from these abstractions are the thing I had already learned to write reactively, but without the help of explicit abstractions for that purpose: graphical user interfaces.

But, encouraged by the “guides”, I didn’t stop there. “Everything is an event stream”, they said. They showed me the classic example of executing a web request, parsing its result, and attaching it to some view on the UI. It seems like magic. Just define your API service’s call as an Observable, which is just a map of the Observable for an a general HTTP request (if your platform doesn’t provide for you, you can easily write it bridging a callback interface to an event stream). Then just do some more mapping and you have a text label that displays “loading…” until the data is downloaded, then it automatically switches to display the loaded data:

class DataStructure
{
    String title;
    int version;
    String displayInfo;
    ...
}

class AppScreenViewModel
{
    ...

    public final Observable<String> dataLabelText;

    ...

    public AppScreenViewModel()
    {
        ...

        Observable<HTTPResponse> response = _httpClient.request(
             "https://myapi.com/getdatastructure",
             HTTPMethod::get
         );

        Observable<DataStructure> parsedResponse = response
            .map(response -> new JSONParser().parse<DataStructure>(response.body, new DataStructure());

        Observable<String> loadedText = parsedResponse
             .map(dataStructure -> dataStructure.displayInfo);

        Observable<String> loadingText = Observable.just("Loading...);

        dataLabelText = loadingText
             .merge(loadedText);
    }
}

...

class AppScreen : View
{
    private TextView dataLabel;

    ...

    private void bindToViewModel(AppScreenViewModel viewModel)
    {
        subscription = viewModel.dataLabelText
            .subscribe(text -> dataLabel.setText(text));
    }
}

That’s pretty neat. And you wouldn’t actually write it like this. I just did it like this to illustrate what’s going on. It would more likely look something like this:

    public AppScreenViewModel()
    {
        ...

        dataLabelText = getDataStructureRequest(_httpClient)
            .map(response -> parseResponse<DataStructure>(response, new DataStructure())
            .map(dataStructure -> dataStructure.displayInfo)
            .merge(Observable.just("Loading...");

        ...
    }

And, of course, you’d want to move the low-level HTTP client stuff out of the ViewModel. What you get is an elegant expression of a pipeline of retrieval and processing steps, with end of the pipe plugged into your UI. Pretty neat!

But… hold on. I’m confused. I have my UI subscribe (that is, listen) to a piece of data that, through a chain of processing steps, depends on the response to an HTTP request. I can understand why, once the response comes in, the data makes its way to the text label. But where did I request the data? Where did I tell the system to go ahead and issue the HTTP request, so that eventually all of this will get triggered?

The answer is that it happens automatically by subscribing to this pipeline of events. That is also when it happens. The subscription happens in bindToViewModel. The request will be triggered by that method calling subscribe on the observable string, which triggers subscribes to all the other observables because that’s how the Observable returned by operators like map and merge work.

Okay… that makes sense, I guess. But it’s kind of a waste of time to wait until then to send the request out. We’re ready to start downloading the data as soon as the view-model is constructed. Minor issue, I guess, since in this case these two times are probably a fraction of a second apart.

But now let’s say I also want to send that version number to another text label:

class DataStructure
{
    String title;
    int version;
    String displayInfo;
    ...
}

class AppScreenViewModel
{
    ...

    public final Observable<String> dataLabelText;
    public final Observable<String> versionLabelText;

    ...

    public AppScreenViewModel()
    {
        ...

        Observable<DataStructure> dataStructure = getDataStructureRequest(_httpClient)
            .map(response -> parseResponse<DataStructure>(response, new DataStructure());

        String loadingText = "Loading...";

        dataLabelText = dataStructure
            .map(dataStructure -> dataStructure.displayInfo)
            .merge(Observable.just(loadingText);

        versionLabelText = dataStructure
            .map(dataStructure -> Int(dataStructure.version).toString())
            .merge(Observable.just(loadingText);
    }
}

...

class AppScreen : View
{
    private TextView dataLabel;
    private TextView versionLabel;

    ...

    private void bindToViewModel(AppScreenViewModel viewModel)
    {
        subscriptions.add(viewModel.dataLabelText
            .subscribe(text -> dataLabel.setText(text)));

        subscriptions.add(viewModel.versionLabelText
            .subscribe(text -> versionLabel.setText(text)));
    }
}

I fire up my app, and then notice in my web proxy that the call to my API went out twice. Why did that happen? I didn’t create two of the HTTP request observables. But remember I said that the request gets triggered in subscribe? Well, we can clearly see two subscribes here. They are each to different observables, but both of them are the result of operators that begin with the HTTP request observable. Their subscribe methods call subscribe on the “upstream” observable. Thus, both of the two chains eventually calls subscribe, once each, on the HTTP request observable.

The honeymoon is wearing off.

Obviously this isn’t acceptable. I need to fix it so that only one request gets made. The ReactiveX docs refer to these kinds of observables as cold. They don’t do anything until you subscribe to them, and when you do, they emit the same items for each subscriber. Normally, we might think of “items” as just values. So at worst this just means we’re making copies of our structures. But really, an “item” in this world is any arbitrary code that runs when the value is produced. This is what makes it possible to stuff very nontrivial behavior, like executing an HTTP request, inside an observable. By “producing” the value of the HTTP response, we execute the code that calls the HTTP client. If we produce that value for “n” listeners, we literally have to produce it “n” times, which means we call the service “n” times.

The nontrivial code that happens as part of producing the next value in a stream is what we can call side effects. This is where the hyper-Copenhagen view of reality starts getting complicated (if it wasn’t already). That tree falling sound causes stuff on its own. It chases birds off, and shakes leaves off of branches. Maybe it spooks a deer, causing it to run into a street, which causes a car driving by to swerve into a service pole, knocking it down and cutting off the power to a neighborhood miles away. So now, “listening” to the tree falling sound means being aware of anything that was caused by that sound. Sitting in my living room and having the lights go out now makes me an observer of that sound.

There’s a reason Schrodinger put the cat in a box: to try as best he could to isolate events inside the box from events outside. Real life isn’t so simple. “Optimizing” out the unobserved part of existence requires you to draw a line (or box?) around all the effects of a cause. The Butterfly Effect laughs derisively at the very suggestion.

Not all Observables are like this. Some of them are hot. They emit items completely on their own terms, even if no subscribers are present. By subscribing, you’ll receive the same values at the same time as any other subscribers. If one subscriber subscribes late, they’ll miss any previously emitted items. An example would be an Observable for mouse clicks. Obviously a new subscriber can’t make you click the mouse again, and you can click the mouse before any subscribers show up.

To fix our problem, we need to convert the cold HTTP response observable to a hot one. We want it to emit its value (the HTTP response, which as a side effect will trigger the HTTP request) on its own accord, independent of who subscribes. This will solve both the problem of waiting too long to start the request, and having the request go out twice. To do this, Rx gives us a subclass of Observable called ConnectableObservable. In addition to subscribe, these also have a method connect, which triggers them to start emitting items. I can use the publish operator to turn a cold observable into a connectable hot one. This way, I can start the request immediately, without duplicating it:

        ConnectableObservable<DataStructure> dataStructure = getDataStructureRequest(_httpClient)
            .map(response -> parseResponse<DataStructure>(response, new DataStructure())
            .publish();

        dataStructure.connect();

        String loadingText = "Loading...";

        dataLabelText = dataStructure
            .map(dataStructure -> dataStructure.displayInfo)
            .merge(Observable.just(loadingText);

        versionLabelText = dataStructure
            .map(dataStructure -> Int(dataStructure.version).toString())
            .merge(Observable.just(loadingText);

Now I fire it up again. Only one request goes out! Yay!! But wait… both of my labels still say “Loading…”. What happened? They never updated.

The response observable is now hot: it emits items on its own. Whatever subscribers are there when that item gets emitted are triggered. Any subscribers that show up later miss earlier items. Well, my dev server running in a VM on my laptop here served up that API response in milliseconds, faster than the time between this code running and the View code subscribing to these observables. By the time they subscribed, the response had already been emitted, and the subscribers miss it.

Okay, back to the Rx books. There’s an operator called replay, which will give us a connectable observable that begins emitting as soon as we call connect, but also caches the items that come in. When anyone subscribes, it first powers through any of those cached items, sending them to the new subscriber in rapid succession, to ensure that every subscriber sees the same sequence of items:

        ConnectableObservable<DataStructure> dataStructure = getDataStructureRequest(_httpClient)
            .map(response -> parseResponse<DataStructure>(response, new DataStructure())
            .replay();

        dataStructure.connect();

        String loadingText = "Loading...";

        dataLabelText = dataStructure
            .map(dataStructure -> dataStructure.displayInfo)
            .merge(Observable.just(loadingText);

        versionLabelText = dataStructure
            .map(dataStructure -> Int(dataStructure.version).toString())
            .merge(Observable.just(loadingText);

I fire it up, still see one request go out, but then… I see my labels briefly flash with the loaded text, then go back to “Loading…”. What the fu…

If you think carefully about the last operator, the merge, well if the response comes in before we get there, we’re actually constructing a stream that consists first of the response-derived string, and then the text “Loading…”. So it’s doing what we told it to do. It’s just confusing. The replay operator, as I said, fires off the exact sequence of emitted items, in the order they were originally emitted. That’s what I’m seeing.

But wait… I’m not replaying the merged stream. I’m replaying the upstream event of the HTTP response. Now it’s not even clear to me what that means. I need to think about this… the dataStructure stream is a replay of the underlying stream that makes the request, emits the response, then maps it to the parsed object. That all happens almost instantaneously after I call connect. That one item gets cached, and when anyone subscribes, it loops through and emits the cached items, which is just that one. Then I merge this with a Just stream. What does Just mean, again? Well, that’s a stream that emits just the item given to it whenever you subscribe to it. Each subscriber gets that one item. Okay, and what does merge do? Well, the subscribe method of a merged stream subscribes to both the upstream observables used to build it, so that the subscriber gets triggered by either one’s emitted items. It has to subscribe to both in some order, and I guess it makes sense that it first subscribes to the stream on which merge was called, and then subscribes to the other stream passed in as a parameter.

So what’s happening is by the time I call subscribe on what happens to be a merged stream, it first subscribes to the replay stream, which already has a cached item and therefore immediately emits it to the subscriber. Then it subscribes to the Just stream, which immediately emits the loading text. Hence, I see the loaded text, then the loading text.

If I swapped the operands so that the Just is what I call merge on, and the mapped data structure stream is the parameter, then the order reverses. That’s scary. I didn’t even think to consider that the placement of those two in the call would matter.

Sigh… okay, I need to express that the loading text needs to always come before the loaded text. Instead of using merge, I need to use prepend. That makes sure all the events of the stream I pass in will get emitted before any events from the other stream:

    ConnectableObservable<DataStructure> dataStructure = getDataStructureRequest(_httpClient)
        .map(response -> parseResponse<DataStructure>(response, new DataStructure())
        .replay();

    dataStructure.connect();

    String loadingText = "Loading...";

    dataLabelText = dataStructure
        .map(dataStructure -> dataStructure.displayInfo)
        .prepend(Observable.just(loadingText);

    versionLabelText = dataStructure
        .map(dataStructure -> Int(dataStructure.version).toString())
        .prepend(Observable.just(loadingText);

Great, now the labels look right! But wait… I always see “Loading…” briefly flash on the screen. All the trouble I just dealt with derived from my dev server responding before my view gets created. I shouldn’t ever see “Loading…”, because by the time the labels are being drawn, the loaded text is available.

But the above explanation covers this as well. We’ve constructed a stream where every subscriber will get the “Loading…” item first, even if the loaded text comes immediately after. The prepend operator produces a cold stream. It always emits the items in the provided stream before switching to the one we’re prepending to.

The stream is still too cold. I don’t want the subscribers to always see the full sequence of items. If they come in late, I want them to only see the latest ones. But I don’t want the stream to be entirely hot either. That would mean if the subscribers comes in after the loaded text is emitted, they’ll never receive any events. I need to Goldilocks this stream. I want subscribers to only receive the last item emitted, and none before that. I need to move the replay up to the concatenated stream, and I need to specify that the cached items should never exceed a single one:

    Observable<DataStructure> dataStructure = getDataStructureRequest(_httpClient)
        .map(response -> parseResponse<DataStructure>(response, new DataStructure());

    dataStructure.connect();

    String loadingText = "Loading...";

    dataLabelText = dataStructure
        .map(dataStructure -> dataStructure.displayInfo)
        .prepend(Observable.just(loadingText)
        .replay(1);

    dataLabelText.connect();

    versionLabelText = dataStructure
        .map(dataStructure -> Int(dataStructure.version).toString())
        .prepend(Observable.just(loadingText)
        .replay(1);

    versionLabelText.connect();

Okay, there we go, the flashing is gone. Oh shit! Now two requests are going out again. By moving the replay up to after the stream bifurcated, each stream is subscribing and caching its item, so each one is triggering the HTTP response to get “produced”. Uggg… I have to keep that first replay to “share” the response to each derived stream and ensure each one gets it even if it came in before their own connect calls.

This is all the complexity we have to deal with to handle a simple Y-shaped network of streams to drive two labels on a user interface. Can you imagine building an entire even moderately complex app as an intricate network of streams, and have to worry about how “hot” or “cold” each edge in the stream graph is?

Is the honeymoon over yet?

All this highly divergent behavior is hidden behind a single interface called Observable, which intentionally obscures it from the users of the interface. When an object hands you an Observable to use in some way, you have no idea what kind of observable it is. That makes it difficult or impossible to track down or even understand why a system built out of reactive event streams is behaving the way it is.

This is the point where I throw up my hands and say wait wait wait… why am I trying to say an HTTP request is a stream of events? It’s not a stream of any sort. There’s only one response. How is that a “stream”? What could possibly make me think that’s the appropriate abstraction to use here?

Ohhh, I see… it’s asynchronous. Rx isn’t just a library with a powerful transformable event stream abstraction. It’s the “all purpose spray” of concurrency! Any time I ever need to do anything with a callback, which I put in because I don’t want to block a thread for a long-running process, apparently that’s a cue to turn it into an Observable and then have it spread to everything that gets triggered by that callback, and so on and so on. Great. I graduated from callback hell to Rx hell. I’ll have to consult Dante’s map to see if that moved me up a level or down.

In the next part, I’ll talk about whether any of the Rx stuff is worth salvaging, and why things went so off the rails.