On ReactiveX – Part II

In the last part, we explored the bizarre world of extreme observer-dependence that gets created in an ReactiveX (Rx)-driven app, and how that world rapidly descends into hell, especially when it is applied as a blanket solution to every problem.

Is the correct reaction to all of this to say “Screw Rx” and be done with it? Well, not entirely. The part where we try to cram every shaped peg into a square hole, we should absolutely say “to hell” with that. Whenever you see library tutorials say any variant of “everything is an X”, you should back away slowly, clutching whatever instrument of self-defense you carry. The only time that statement is true is if X = thing. Yes, everything is a thing… and that’s not very insightful, is it? The reason “everything is an X” with some more specific X seems profound is because it’s plainly false, and you have to imagine some massive change in perception for it to possibly be true.

Rx’s cult of personality cut its way through the Android world a few years ago, and now most of its victims have sobered up and moved on. In what is a quintessentially Apple move, Apple invented their own, completely proprietary and incompatible version of Rx, called Combine, a couple of years ago, and correspondingly the “everything is a stream” drug is making its rounds through the iOS world. It, too, will come to pass. A large part of what caused RxJava to wane is Kotlin coroutines, and with finally Swift gaining async/await, Combine will subside as well. Why do these “async” language features replace Rx? Because Rx was touted as the blanket solution to concurrency.

Everything is not an event stream, or an observable, period. Some things are. Additionally, the Rx Observable is a concept with far too much attached to it. It is trying to be so many things at once, owing to the fact it’s trying to live up to the “everything is a me” expectation, which will only result in Observable becoming a synonym for Any, except instead of it doing what the most general, highest category should do (namely, nothing), it endeavors instead to do the opposite: everything. It’s a God object in the making. That’s why it ends up everywhere in your code, and gradually erodes all the information a robust type system is supposed to communicate.

But is an event stream, endeavoring only to be an event stream, with higher-order quasi-functional transformations, a useful abstraction? I believe it’s a massively useful one. I still use it for user interfaces, but I reluctantly do so with Rx’s version of one, mostly because it’s the best one available.

The biggest problem with Rx is that its central abstraction is really several different abstractions, all crammed together. After thinking about this for a while, I have identified four distinct concepts that have been merged under the umbrella of the Observable interface. By disentangling these from each other, we can start to rebuild more focused, well-formed libraries that aren’t infected with scope creep.

These are the four abstractions I have identified:

  • Event streams
  • Data streams
  • Observable values
  • Tasks

Let’s talk about each one, what they are (and just as importantly, what they aren’t), and how they are similar to and different from Rx’s Observable.

Event Streams

Let us return from the mind-boggling solipsism of extreme Copenhagen interpretation, where the world around us is brought into being by observing it, and return to classical realism, where objective reality exists independent of observation. Observation simply tells us what is out there. An event stream is literally a stream of events: things that occur in time. Observing is utterly passive. It does not, in any way, change what events occur. It merely signs one up to be notified when they do.

The Rx Observable practically commands the Copenhagen outlook by making the abstract method, to be overridden by the various subclasses returned by operators, subscribe. It is what, exactly, subscribing (a synonym for observing) means that varies with different types of streams. This is where the trouble starts. It sets us up to have subscribe be what controls publish.

A sane approach to an event stream is for the subscribe method to be final. Subscribing is what it is: it just adds a callback to the list of callbacks to be triggered when an event is published. It should not alter what is published. The interesting behavior should occur exclusively in the constructor of a stream.

Let us recall the original purpose of the Observer Pattern. The primary purpose is not really to allow one-to-many communication. That’s a corollary of its main purpose. The main purpose is to decouple the endpoints of communication, specifically to allow one object to send messages to another object without ever knowing about that other object, not even the interfaces it implements.

Well, this is no different than any delegation pattern. I can define a delegate in class A, then have class B implement that delegate, allowing A to communicate with B without knowing about B. So what is it, specifically, about the Observer pattern that loosens the coupling even more than this?

This answer is that the communication is strictly one way. If an A posts an event, and B happens to be listening, B will receive it, but cannot (without going through some other interface that A exposes) send anything back to Anot even a return value. Essentially, all the methods in an observer interface must have void returns. This is what makes one-to-many broadcasting a trivial upgrade to the pattern, and why you typically get it for free. Broadcasting with return values wouldn’t make sense.

The one-way nature of the message flow creates an objective distinction between the publisher (or sender) and the subscriber (or receiver). The intermediary that moves the messages around is the channel, or broker. This is distinct from, say, the Mediator Pattern, where the two ends of communication are symmetric. An important consequence of the asymmetry of observers is that the presence of subscribers cannot directly influence the publisher. In fact, the publisher in your typical Observer pattern implementation can’t even query who is a subscriber, or even how many subscribers there are.

A mediator is like your lawyer talking to the police. An observer is like someone attending a public speech you give, where the “channel” is the air carrying the sound of your voice. What you say through your lawyer depends on what questions the police ask you. But the speech you give doesn’t depend on who’s in the audience. The speaker is therefore decoupled from his audience to a greater degree than you are decoupled from the police questioning you.

By moving the publishing behavior into subscribe, Rx is majorly messing with this concept. It muddles the distinction between publisher/sender and subscriber/receiver, by allowing the subscribe/receive end of the chain to significantly alter what the publisher/sender side does. It’s this stretching of the word “observe” to mean something closer to “discuss” that can cause confusion like “why did that web request get sent five times?”. It’s because what we’re calling “observing a response event” is more like “requesting the response and waiting for it to arrive”, which is a two-way communication.

We should view event streams as a higher abstraction level for the Observer Pattern. An EventStream is just a wrapper around a channel, that encapsulates publishing and defines transformation operators that produce new EventStreams. The publishing behavior of a derived stream is set up at construction of the stream. The subscribe method is final. Its meaning never changes. It simply forwards a subscribe call to the underlying channel.

Event streams are always “hot”. If the events occur, they are published, if not, they aren’t. The transformation operations are eager, not lazy. The transform in map is evaluated on each event as soon as the event is published, independent of subscribers. This expresses the realism of this paradigm: those mapped events happen, period. Subscribing doesn’t make them happen, it just tells us about them. The way we handle whether derived streams continue to publish their derived events is by holding onto the stream. If a derived stream exists, it is creating and publishing derived events. If we want the derived events to stop firing, we don’t throw away subscriptions, we throw away the stream itself.

There’s no problem of duplication here. The subscribing is one-to-many, but the construction of the events, the only place where any kind of side effects can occur, is tied the construction of derived streams, which only happens once. One stream = one instance of each event. The other side of that coin is that missed events are missed, period. If you want any kind of caching behavior, that’s not an event stream. It’s something else.

I think we’ll also find that by separating out the other concepts we’ll get to next, the need to ever create event streams that have any side effects is reduced to essentially zero.

Rx streams have behavior for handling the stream “completing”, and handling exceptions that get thrown during construction of an item to be emitted. I have gone back and forth over whether it makes sense for a strict event stream to have a notion of “completing”. I lean more toward thinking it doesn’t, and that “completion” applies strictly to the next concept we’ll talk about.

What definitely does not make sense for event streams is failures. Event streams themselves can’t “fail”. Events happen or they don’t. If some exception gets thrown by a publisher, it’s a problem for the publisher, that’s either going to be trapped by the publisher, will kill the publisher, or kill the process. Having it propagate to subscribers, and especially having it (by design) terminate the whole stream doesn’t make sense.

Data Streams

The next concept is a data stream. How are “data” streams different from “event” streams? Isn’t an event just some data? Well, an event holds data, but the event is the occurrence itself. With data streams, the items are not things that occur at a specific time. They may become available at a specific time, but that time is otherwise meaningless. The only significance of the arrival time of a datum is that we have to wait for it.

More importantly, in a stream of data, every datum matters. It’s really the order, not the timing, of the items that’s important. It’s critical that someone reading the data stream receive every element in the correct order. If a reader wants to skip some elements, that’s his business. But it wouldn’t make sense for a reader to miss elements and not know it.

We subscribe to an event stream, but we consume a data stream. Subscribing is passive. It has no impact on the events in the stream. Consuming is active. It is what drives the stream forward. The “next” event in a stream is emitted whenever it occurs, independent of who is subscribed. The “next” event of a data stream is emitted when the consumer decides to consume it. In both cases, once an element is emitted, it is never re-emitted.

Put succinctly, an event stream is producer-driven, and a data stream is consumer-driven. An event stream is a push stream, and a data stream is a pull stream.

This means a data stream cannot be one-to-many. An event stream can have arbitrarily many subscribers, only because subscribing is passive; entirely invisible to the publisher. But a data stream cannot have multiple simultaneous consumers. If we passed a data stream to multiple consumers who tried to read at the same time, they would step on each others’ toes. One would consume a datum and cause the other one to miss it.

To clarify, we’re talking about a specific data stream we call an input stream. It produces values that a consumer consumes. The other type of data stream is an output stream, which is a consumer itself, rather than a producer. Output streams are a separate concept not related to Rx Observables, because Observables are suppliers, not consumers (consumers in Rx are called Subscribers).

Most languages already have input and output stream classes, but they aren’t generic. Their element type is always bytes. We can define a generic one like this:

interface InputStream<Element>
{
    bool hasMore();

    Element read();

    UInt skip(UInt count);
}

This time it’s a pure interface. There’s no default behavior. Different types of streams have to define what read means.

Data streams can be transformed in ways similar to event streams. But since the “active” part of a data stream is the reading, it is here that a derived stream will interact with its source stream. This will look more like how Rx Observable implements operators. The read method will be abstract, and each operator, like map and filter, will implement read by calling read on the source stream and applying the transform. In this case, the operators are lazy. The transform is not applied to a datum until a consumer consumes the mapped stream.

The obvious difference between this an Rx Observables is that this is a pull, rather than push, interface. The read method doesn’t take a callback, it returns a result. This is exactly what we want for a stream where the next value is produced by the consumer requesting it. A data stream is inherently a pull paradigm. A push-style interface just obscures this. Typical needs with data streams, for example reading “n” values, then switching to do other stuff and then returning to read some more, become incredibly convoluted with an interface designed for a stream where the producer drives the flow.

A pull interface requires that if the next datum isn’t available yet, the thread must block. This is the horror that causes people to turn everything into callbacks: so they never block threads. The phobia of blocking threads (which is really a phobia of creating your own threads that can be freely blocked without freezing the UI or starving a thread pool) is a topic for another day. For the sake of argument I’ll accept that it’s horrible and we must do everything to avoid it.

The proper solution to the problem of long-running methods with return values that don’t block threads is not callbacks. Callback hell is the price we pay for ever thinking it was, and Rx hell is really a slight variation of callback hell with even worse problems layered on top. The proper solution is coroutines, specifically async/await.

This is, of course, exactly how we’d do it today in .NET, or any other language that has coroutines. If you’re stuck with Java, frankly I think you should just let the thread block, and make sure you do the processing on a thread you created (not the UI thread). That is, after all, exactly how Java’s InputStream works. If you are really insistent on not blocking, use a Future. That allows consuming with a callback, but it at least communicates in some way that you only expect the callback to be called once. That means you get a Future each time you read a chunk of the stream. If that seems ugly/ridiculous to you, then just block the damn thread!

Data streams definitely have a notion of “completing”. Their interface needs to be able to tell a consumer that there’s nothing left to consume. How does it handle errors? Well, since the interface is synchronous, an exception thrown by a transformation will propagate to the consumer. It’s his business to trap it and decide how to proceed. It should only affect that one datum. It should be possible to continue reading after that. If an intermediate derived stream doesn’t deal with an exception thrown by a source stream, it will propagate through until it gets to an outer stream that handles it, or all the way out to the consumer. This is another reason why a synchronous interface is appropriate. It is exactly what try-catch blocks do. Callback interfaces require you to essentially try-catch on every step, even if a step actually doesn’t care about (and cannot handle) an error and simply forwards it. You know you hate all that boilerplate. Is it really worth all of that just to not block a thread?

(If I was told I simply cannot block threads I’d port the project to Kotlin before trying to process data streams with callbacks)

Observable Values

Rx named its central abstraction Observable. This made me think if I create an Observable<String>, it’s just like a regular String, except I can also subscribe to be notified when it changes. But that’s not at all what it is. It’s a stream, and streams aren’t values. They emit values, but they aren’t values themselves. What’s the difference, exactly? Well, if I had what was literally an observable String, I could read it, and get a String. But you can’t “read” an event stream. An event stream doesn’t have a “current value”. It might have a most recently emitted item, but those are, conceptually, completely different.

Unfortunately, in its endeavor toward “everything is me”, Rx provides an implementation of Observable whose exact purpose is to try to cram these two orthogonal concepts together: the BehaviorSubject. It is a literal observable value. It can be read to get its current value. It can be subscribed to, to get notified whenever the value changes. It can be written to, which triggers the subscribers.

But since it implements Observable, I can pass it along to anything that expects an Observable, thereby forgetting that its really a BehaviorSubject. This is where it advertises itself as a stream. You might think: well it is a stream, or rather changes to the value are a stream. And that is true. But that’s not what you’re subscribing to when you subscribe to a BehaviorSubject. Subscribing to changes would mean you don’t get notified until the next time the value gets updated. If it never changes, the subscriber would never get called. But subscribers to a BehaviorSubject always get called immediately with the current value. If all you know if you’ve got an Observable, you’ll have no idea if this will happen or not.

Once you’ve upcast to an Observable, you lose the ability to read the current value. To preserve this, you’ll have to expose it as a BehaviorSubject. The problem then becomes that this exposes both reading and writing. What if you want to only expose reading the current value, but not writing? There’s no way to do this.

The biggest problem is that operators on a BehaviorSubject produce the same Observable types that those operators always do, which again loses the ability to read the current value. You end up with a derived Observable where the subscriber always gets called immediately (unless you drop or filter or do something else to prevent this), so it certainly always has a current value, you just can’t read it. This has forced me to do very stupid stuff like this:

BehaviorSubject<Int> someInt = new BehaviorSubject<Int>(5);

Observable<String> stringifiedInt = someInt
    .map(value -> value.toString());

...

String currentStringifiedInt = null;

Disposable subscription = stringifiedInt
    .subscribe(value ->
    {
        currentStringifiedInt = value;
    });

subscription.dispose();

System.out.print("Current value: " + currentStringifiedInt);
...

This is ugly, verbose, obtuse and unsafe. I have to subscribe just to trigger the callback to produce the current value for me, then immediately close the subscription because I don’t want that callback getting called again. I have to rely on the fact that a BehaviorSubject-derived observable will emit items immediately (synchronously), to ensure currentStringifiedInt gets populated before I use it. If I turn the derived observable back into a BehaviorSubject (which basically subscribes internally and sticks each updated value into the new BehaviorSubject), I can read the current value, but I can write to it myself, thereby breaking the relationship between the derived observable and the source BehaviorSubject.

The fundamental problem is that observable values and event streams aren’t the same thing. We need a separate type for this. Specifically, we need two interfaces: one for read-only observable values, and one for read-write observable values. This is where we’re going to see the type of subscribe-driven lazy evaluation that we see inside of Rx Observables. Derived observables are read-only. Reading them triggers whatever cascade of processing and upstream reading is necessary to produce the value. When we subscribe, that is where it will subscribe to its source observables, inducing them to compute their values when necessary (when those values update) to send them downstream.

Furthermore, the subscribe method on our Observable should explicitly ask whether the subscriber wants to be immediately notified with the current value (by requiring a boolean parameter). Since we have a separate abstraction for observable values, we know there is always a current value, so this question always makes sense.

Since the default is lazy, and therefore expensive and repetitious evaluation, we’ll need an operator specifically to store a derived observable in memory for quick evaluation. Is this comparable to turning a cold (lazy) Rx Observable into a hot (eager) one? No, because the thing you subscribe to with observable values, the changes, are always hot. They happen, and you miss them if you aren’t subscribed. Caching is purely a matter of efficiency, trading computing time for computing space (storage). It has no impact whatsoever on when updates get published.

Caching will affect whether transformations to produce a value are run repeatedly, but only for synchronous reads (multiple subscribers won’t cause repeated calculations). The major difference is that we can eliminate repeated side-effects from double-calculating a value without changing how or when its updates are published. What subscribers see is totally separate from whether an observable value is cached, unlike in Rx where “sharing” an Observable changes what subscribers see (it causes them to miss what they otherwise would have received).

A single Observable represents a single value. Multiple subscribers means multiple people are interested in one value. There’s no issue of “making sure all observers see the same sequence”. If a late subscriber comes in, he’ll either request the current value, whatever it is, or just request to be notified of later changes. The changes are true events (they happen, or they don’t, and if they do they happen at a specific time). We’d never need to duplicate calculations to make multiple subscribers see stale updates.

Furthermore, we communicate more clearly what, if any, “side effects” should be happening inside a transformation. They should be limited to whatever is necessary to calculate the value. If we have a derived value that requires an HTTP request to calculate it, this request will go out either when the source value changes, requiring a re-evaluation, or it will happen when someone tries to read the value… unless we cache it, which ensures the request always goes out as soon as it can. It is multiple synchronous reads that would, for non-cached values, trigger multiple requests, not multiple subscribers. This makes sense. If we’ve specified we don’t want to store the value, we’re saying each time we want to query the value we need to do the work of computing it.

Derived (and therefore read-only) observable values, which can both be subscribed to and read synchronously, is the most important missing piece in Rx. It’s so important I’ve gone through the trouble multiple times to build rudimentary versions of it in some of my apps.

“Completion” obvious makes no sense for observable values. They never stop existing. Errors should probably never happen in transformations. If a runtime exception sneaks through, it’s going to break the observable. It will need to be rethrown every time anyone tries to read the value (and what about subscribing to updates?). The possibility of failure stretches the concept of a value whose changes can be observed past, in my opinion, its range of valid interpretation. You can, of course, define a value that has two variations of success and failure (aka a Result), but the possibility of failure is baked into the value itself, not its observability.

Tasks

The final abstraction is tasks. Tasks are just asynchronous function invocations. They are started, and they do or do not produce a result. This is fundamentally different from any kind of “stream” because tasks only produce one result. They may also fail, in which case they produce one exception. The central focus of tasks is not so much on the value they produce but on the process of producing it. The fact the process is nontrivial and long-running is the only reason you’d pick a task over a regular function to begin with. As such, tasks expose an interface to start, pause/resume and cancel. Tasks are, in this way, state machines.

Unlike any of the other abstractions, tasks really do have distinct steps for starting and finishing. This is what ConnectableObservable is trying to capture with its addition (or rather, separation from subscribe) of connect. The request and the response are always distinct. Furthermore, once a task is started, it can’t be “restarted”. Multiple people waiting on its response doesn’t trigger the work to happen multiple times. The task produces its result once, and stores it as long as it hangs around in case anyone else asks for it.

Since the focus here is on the process, not the result, task composition looks fundamentally different from stream composition. Stream composition, including pipelines, focuses on the events or values flowing through the network. While task composition deals with the results, it does so primarily in its concern with dependency, which is about the one thing task composition is really concerned with: exactly when the various subtasks can be started, relative to when other tasks start or finish. Task composition is concerned with whether tasks can be done in parallel or serially. This is even a concern for tasks that don’t produce results.

Since tasks can fail, they also need to deal with error propagation. An error in a task means an error occurring somewhere in the process of running the task: moving it from start to finish. It’s the finishing that is sabotaged by an error, not the starting. We expect starting a task to always succeed. It’s the finishing that might never happen due to an error. This is represented by an additional state for failed. This is why it is not starting a task that would throw an exception, but waiting on its result. It makes sense that in a composed task, if a subtask fails, the outer task may fail. The outer task either expects and handles the error by trapping it, or it doesn’t, in which case it propagates out and becomes a failure of the outer task.

This propagation outward of errors, through steps that simply ignore those errors (and therefore, ideally, should contain absolutely no boilerplate code for simply passing an error through), is similar to data streams, and it therefore demands a synchronous interface. This is a little more tricky though because tasks are literally concerned with composing asynchrony. Even if we’re totally okay with blocking threads, what if we want subtasks to start simultaneously? Well, that’s what separating starting from waiting on the result lets us do. We only need to block when we need the result. That can be where exceptions are thrown, and they’ll automatically propagate through steps that don’t deal with them, which is exactly what we want. This separates when an exception is thrown from when an exception is (potentially) caught, and therefore requires tasks to cache exceptions just like they do their result.

We can, of course, avoid blocking any threads by using coroutines. That’s exactly what the .NET Tasks do. If you’re in a language that doesn’t have coroutines, I have the same advice I have for data streams: just block the damn threads. You’ll tear your hair out with the handleResult/handleError pyramids of callback doom, where most of your handleError callbacks are just calling the outer handleError to pass errors through.

What’s missing in the Task APIs I’ve seen is functional transformations like what we have on the other abstractions. This is probably because the need is much less. It’s not hard at all to do what it is essentially a map on a Task:

async Task<MappedResult> mapATask()
{
    Task<Result> sourceTask = getSourceTask();
    Function<Result, MappedResult> transform = getTransform();

    return transform(await sourceTask);
}

But still, we can eliminate some of that boilerplate with some nice extension methods:

static async Task<MappedResult> Map<Result, MappedResult>(this Task<Result> ThisTask, Function<Result, MappedResult> transform)
{
    return transform(await ThisTask);
}

...

Task<Result> someTask = getTask();

await someTask
  .map(someTransform)
  .map(someOtherTransform);

Conclusion

By separating out these four somewhat similar but ultimately distinct concepts, we’ll find that the “hot” vs. “cold” distinction is expressed by choosing the right abstraction, and this is exposed to the clients, not hidden in the implementation details. Furthermore, the implication of side effects is easier to understand and address. We make a distinction of how “active” or “passive” different actions are. Observing an event is totally passive, and cannot itself incur side effects. Constructing a derived event stream is not passive, it entails the creation of new events. Consuming a value in a data stream is also not passive. Notice that broadcasting requires passivity. The only one-to-many operations available, once we distinguish the various abstractions, are observing an event stream and observing changes to an observable value. The former alone cannot incur side effects itself, and the latter can only occur side effects when going from no observers to more than none, and thus is independent of the multiplicity of observers. We have, in this way, eliminated the possibility of accidentally duplicating effort in the almost trivial manner that it is possible in Rx.

In the next part, we’ll talk about those transformation operators, and what they look like after separating the abstractions.

Leave a Reply

Your email address will not be published. Required fields are marked *