On ReactiveX – Part III

Introduction

In the last part, we took a knife to Rx’s one-size-fits-all “Observable” abstraction, and carved out four distinct abstractions, each with unique properties that should not be hidden from whoever is using them.

The true value, I believe, of Rx is in its transformation operators… the almost endless zoo of options for how to turn one Observable into another one. The programming style Rx encourages is to build every stream you need through operators, instead of by creating Subjects and publishing to them “imperatively”.

So this begs the question… what happens to this rich (perhaps too rich, as the sheer volume of them is headache-inducing) language of operators when we cleave the Observable into the EventStream, the DataStream, the ObservableValue and the Task?

What happens is operators also get divided, this time into two distinct categories. The first category is those operators that take one of those four abstractions and produces the same type of abstraction. They are, therefore, still transformations. They produce an EventStream from an EventStream, or a DataStream from a DataStream, etc. The second category is those operators that take one of those four abstractions and produces a different type of abstraction. They are not transformers but converters. We can, for example, convert an EventStream to a DataStream.

Transformations

First let’s talk about transformations. After dividing up the abstractions, we now need to divvy up the transformations that were originally available on Observable, by determining which ones apply to which of these more focused abstractions. We’ll find that some still apply in all cases, while others simply don’t make sense in all contexts.

We should first note that, like Observable itself, all four abstractions I identified retain the property of being generics with a single type parameter. Now, let’s consider the simplest transformation, map: a 1-1 conversion transformation that can change that type parameter. This transformation continues to apply to all four abstractions.

We can map an EventStream to an EventStream: this creates a new EventStream, which subscribes to its source EventStream, and for each received event, applies a transformation to produce a new event of a new type, and then publishes it. We can map a DataStream to a DataStream: this creates a new DataStream, where every time we consume one of its values, it first consumes a value of its source DataStream, then applies a transformation and returns the result to us. We can map an ObservableValue: this creates a new ObservableValue whose value is, at any time, the provided transformation of the source ObservableValue (this means it must be read only. We can’t manually set the derived ObservableValue‘s value without breaking this relationship). It therefore updates every time the source ObservableValue updates. We can map a Task to a Task: this is a Task that performs its source Task, then takes the result, transforms it, and returns it as its own result.

We also have the flatMap operator. The name is confusing, and derives from collections: a flatMap of a Collection maps each element to a Collection, then takes the resulting Collection of Collections and flattens it into a single Collection. Really, this is a compound operation that first does a map, then a flatten. The generalizing of this transformation is that it takes an X of X of T, and turns it in an X of T.

The flatten operator, and therefore flatMap, also continues to apply to all of our abstractions. How do we turn an EventStream of EventStreams of T into an EventStream of T? By subscribing to each inner EventStream as it is published by the outer EventStream, and publishing its events to a single EventStream. The resulting EventStream receives all the events from all the inner EventStreams as they become available. How do we turn a DataStream of DataStreams of T into a DataStream of T? When we consume a value, we consume a single DataStream from the outer DataStream, store it, and then supply each value from it on each consumption, until it runs out, then we go consume the next DataStream from the outer DataStream, and repeat. How do we turn an ObservableValue of an ObservableValue of T into an ObservableValue? By making the current value the current value of the current ObservableValue of the outer ObservableValue (which then updates every time either the outer or inner ObservableValue updates). How do we turn a Task that produces a Task that produces a T into a Task that produces a T? We run the outer Task, then take the result inner Task, run it, and return its result.

In Rx land, it was realized that flatten actually has another variation that didn’t come up with ordinary Collections. Each time a new inner Observable is published, we could continue to observe the older inner Observables, or we can stop observing the old one and switch to the new one. This is a slightly different operator called switch, and it leads to the combined operator switchMap. For us, this continues to be the case for only EventStream, because the concept depends on being producer-driven: that streams publish values on their own accord, and we must decide whether to keep listening for them. DataStreams are consumer-driven, so flatMap must get to the end of one inner DataStream before moving to the next. ObservableValue and Task don’t involve multiple values so the concept doesn’t apply there.

Now let’s look at another basic transformation: filter. Does this apply to all the abstractions? No... because filtering is inherently about multiple values: some get through, some don’t. But only two of our four abstractions involve multiple values: EventStream and DataStream. We can therefore meaningfully filter those. But ObservableValue and Task? Filtering makes no sense there, because there’s only one value or result. Any other transformations that inherently involve multiple values (filter is just one, others include buffer or accumulate) therefore only apply to EventStream and DataStream, but not ObservableValue or Task.

Another basic operator is combining: taking multiple abstractions and combining them into a single one. If we have an X of T1 and an X of T2, we may be able to combine them into a single X of a combined T1 and T2 (i.e. a (T1, T2) tuple). Or, if we have a Collection of Xs of Ts, we can combine them into a single X of a Collection of Ts, or possibly a single X of T. Can this apply to all four abstractions? Yes, but we’ll see that for abstractions that involve multiple values, there are multiple ways to “combine” their values, while for the ones that involve on a single value, there’s only one way to “combine” their values.

That means for ObservableValue and Task, there’s one simple combine transformation. A combined ObservableValue is one whose value is the tuple/collection made up of its sources’ values, and therefore it changes when any one of its source values changes. A combined Task is one that runs each one of its source Tasks in parallel, waits for them all to finish, then returns the combined results of all as its own result (notice that, since Task is fundamentally concerned with execution order, this becomes a key feature of one of its transformations).

With EventStream and DataStream, there are multiple ways in which we can combine their values. With EventStream, we can wait for all sources to publish one value, at which point we publish the first combined value, we store these combined values, then each time any source value changes, we update only the published one, keeping all the rest the same, and publish that new combined value. This is the combineLatest operator: each published event represents the most recently published events from each source stream. We can alternatively wait for each source to publish once, at which point we publish the combination, then we don’t save any values, and again wait for all sources to publish again before combining and publishing again. This is the zip operator.

But combineLatest doesn’t make sense for DataStream though, because it based on when each source stream publishes. The “latest” of combineLatest refers to the timing of the source stream events. Since DataStreams are consumer-driven, there is no timing. The DataStream is simply asked to produce a value by a consumer. Therefore, there’s only combine, where when a consumer consumes a value, the combined DataStream then consumes a value from each of its sources, combines then and returns it. This is the zip operator, which continues to apply to DataStream.

Both EventStream and DataStream also ways to combine multiple streams into a single stream of the same type. With EventStream, this is simply the stream that subscribes to multiple sources and publishes when it receives a value from any of them. This is the merge operator. The order in which a merged EventStream publishes is dictated by the order it receives events from its source streams. We can do something similar with DataStream, but since DataStreams are consumer-driven, the transformation has to decide which source to consume from. A merge would be for the DataStream to first consume all the values of the first source stream, then all the values of second one, and so on (thus making it equivalent to a merge on a Collection… we could also call this concatenate to avoid ambiguity). We can also do a roundRobin: each time we consume a value the combined stream consumes one from a particular source, then one from the next one, and so on, wrapping back around after it reaches the end. There are all sorts of ways we can decide how to pick the order of consumption, and a custom algorithm can probably be plugged in as a Strategy to a transformation.

Somewhat surprisingly, I believe that covers it for ObservableValue and Task, with one exception (see below): map, flatten and combine are really the only transformations we can meaningfully do with them, because all other transformations involve either timing or value streams. Most of the remaining transformations from Rx we haven’t talked about will still apply to both EventStream and DataStream, but there are some important ones that only apply to one or the other. Any transformations that involve order apply only to DataStream, for example append or prepend. Any transformations that are driven by timing of the source streams apply only to EventStream, for example debounce or delay. And some transformations are really not transformations but conversions.

The exception I mentioned is for ObservableValue. EventStreams are “hot”, and their transformations are “eager”, and it never makes sense for them to not be (in the realist interpretation of events and “observing”). Derived ObservableValues, however, can be “eager” or “lazy”, and both are perfectly compatible with the abstraction. If we produce one ObservableValue from a sequence of transformations (say, maps and combines) on other ObservableValues, then we can choose to either perform those transformations every time someone reads the value, or we can choose to store the value, and simply serve it up when asked.

I believe the best way to implement this is to have derived ObservableValues be lazy by default: their values get computed from their sources on each read. This also means when there are subscribers to updates, they must subscribe to the source values’ updates, then apply the transformations each time new values are received by the sources. But sometimes this isn’t the performance we want. We might need one of those derived values to be fast and cheap to read. To do that, we can provide the cache operator. This takes an ObservableValue, and creates a new one that stores its value directly. This also requires that it eagerly subscribe to its source value’s updates and use them to update the stored value accordingly. There is also an issue of thread safety: what if we want to read a cached ObservableValue from one thread but it’s being written to on another thread? To handle this we can allow the cache operator to specify how (using a Scheduler) the stored value is updated. These issues of caching and thread safety are unique to observable values.

Converters

Now let’s talk about how we can turn one of these four abstractions into another one of of the four. In total that would be 12 possible converters, assuming that there’s a useful or sensible way to convert each abstraction into each other one.

Let’s start with EventStream as the source.

What does it mean to convert an EventStream into a DataStream? This means we’re talking a producer driven stream and converting it to a consumer driven stream. Remember the key distinction is that EventStreams are defined by timing: the events happen when they do, and subscribers are either subscribed at the time or they miss them. DataStreams are defined by order: the data are returned in a specific sequence, and it’s not possible to “miss” one (you can skip one but that’s a conscious choice of the consumer). Thus, turning an EventStream into a DataStream is fundamentally about storing events as they are received until they are later consumed, ensuring that none can be missed. It is, therefore, buffering. For this reason, this conversion operator is called buffer. It internally builds up a Collection of events received from the EventStream, and when a consumer consumes a value, the first element in the collection is returned immediately. If the Collection is empty, the consumer will be blocked until the next event is received.

What does it means to convert an EventStream to an ObservableValue? It would mean we’re storing the latest event emitted by the stream, so we can query what it is at any time. We call this converter cacheLatest. Note that the latest event must be cached, or else we wouldn’t be able to read it on demand. That’s fundamentally what this converter is doing: taking transient events that are gone right after they occur, and making them persistent values that can be queried as needed. This can be combined with other transformations on EventStream to produce some useful derived converters. For example, if we apply the accumulate operator to the EventStream, then use cacheLatest to produce an ObservableValue, the result is an accumulateTo operator, which stores a running accumulation (perhaps a sum) of incoming values over time.

What does it mean to convert an EventStream to a Task? Well, basically it would mean we create a Task that waits for one or more events to be published, then returns them as the result. But as we will see soon, it makes more sense to create “wait for the next value” Tasks out of DataStreams, and we can already convert EventStreams to DataStreams with buffer. Therefore, this converter would really be a compound conversion of first buffering and then fetching the next value. We can certainly write it as a convenience function, but under the hood it’s just composing other converters.

Now let’s move to DataStream as the source.

What does it mean to convert a DataStream to an EventStream? Well, an EventStream publishes its own events, but a DataStream only returns values when a consumer consumes them. Thus, turning a DataStream into an EventStream involves setting up a consumer to immediately start consuming values, and publish them as soon as they are available. The result is that multiple observers can now listen for those values as they get published, with the caveat that they’ll miss any values if they don’t subscribe at the right time. We can call this conversion broadcast.

What does it mean to convert a DataStream to an ObservableValue? Nothing useful or meaningful, as far as I can tell. Remember meaning of converting an EventStream to an ObservableValue was to cache the latest value. That’s a reference to timing. But timing in a DataStream is controlled by the consumer, so all that could mean is a consumer powers through all the values and saves them to an ObservableValue. The result is a rapidly changing value that then gets stuck on the last value in the DataStream. That doesn’t appear to be a valid concept.

What does it mean to convert a DataStream to a Task? Simple: read the next value! In fact, in .NET, where Task is the return value of all async functions, the return value of the async read method would then have to be a Task. There can, of course, be other related functions to read more than one value. We can also connect an input DataStream to an output DataStream (which, remember, is a valid abstraction but not one carved out from Observable, which only represents sources and not sinks), which results in a Task whose work is to consume values from the input and send them to the output.

Now let’s move to ObservableValue as the source.

What does it mean to convert an ObservableValue to an EventStream? Simple: publish the updates! Now, part of an ObservableValue‘s features is being able to subscribe to updates. How is this related to publishing updates? When we subscribe to an ObservableValue, the subscription is lazy (for derived ObservableValues, the transformations are applied to source values as they arrive), and we have the option of notifying our subscriber immediately with the current value. But when we produce an EventStream from an ObservableValue, remember EventStreams are always hot! It must eagerly subscribe to the ObservableValue and then publish each update it receives. This is significant for derived lazy ObservableValues, because as long as someone holds onto an EventStream produced from it, it has an active subscription and therefore its value is being calculated, which it wouldn’t be if no one was subscribed to it. We can call this converter publishUpdates: it is basically ensuring that updates are eagerly computed and broadcasted so that anyone can observe them as any other EventStream.

What does it mean to convert an ObservableValue to a DataStream? Nothing useful or meaningful that I can think of. At best it would be a stream that lets us read the updates, but that’s just publishUpdates followed by buffer.

What does it mean to convert an ObservableValue to a Task? Again, I can’t think of anything useful or meaningful, that wouldn’t be a composition of other converters.

Now let’s move to Task as the source.

Tasks don’t convert to any of the other abstractions in any meaningful way, because they have only a single return value. Even ObservableValues, which fundamentally represent single values, still have an associated stream of updates. Tasks don’t even have this. For this reason, we can’t derive any kind of meaningful stream from a Task, which means there’s also nothing to observe.

The converters are summarized in the following table (the row is the input, the column is the output):

EventStreamDataStreamObservableValueTask
EventStreamN/AbuffercacheLatestnone
DataStreambroadcastN/Anoneread
ObservableValuepublishUpdatesnoneN/Anone
TasknonenonenoneN/A

There are, in total, only five valid converters.

Conclusion

After separating out the abstractions, we find that the humongous zoo of operators attached to Observable is tidied up into more focused groups of transformations on each of the four abstractions, plus (where applicable) ways to convert from one to the other. What this reveals is that places where an Rx-driven app creates deep confusion over “hotness/coldness” and side effects are areas where an Observable really represents one of these four abstractions but it combined with a transformation operation that does not apply to that abstraction. For example, one true event stream (say, of mouse clicks or other user gestures) appended to another one makes no sense. Nor does trying to merge two observable values into a stream based on which one changes first.

In the final part, we’ll revisit the example app from the first part, rewrite it with our new abstractions and escape the clutches of Rx Hell.

Leave a Reply

Your email address will not be published. Required fields are marked *