Introduction
In the last part, we took a knife to Rx’s one-size-fits-all “Observable
” abstraction, and carved out four distinct abstractions, each with unique properties that should not be hidden from whoever is using them.
The true value, I believe, of Rx is in its transformation operators… the almost endless zoo of options for how to turn one Observable
into another one. The programming style Rx encourages is to build every stream you need through operators, instead of by creating Subject
s and publishing to them “imperatively”.
So this begs the question… what happens to this rich (perhaps too rich, as the sheer volume of them is headache-inducing) language of operators when we cleave the Observable
into the EventStream
, the DataStream
, the ObservableValue
and the Task
?
What happens is operators also get divided, this time into two distinct categories. The first category is those operators that take one of those four abstractions and produces the same type of abstraction. They are, therefore, still transformations. They produce an EventStream
from an EventStream
, or a DataStream
from a DataStream
, etc. The second category is those operators that take one of those four abstractions and produces a different type of abstraction. They are not transformers but converters. We can, for example, convert an EventStream
to a DataStream
.
Transformations
First let’s talk about transformations. After dividing up the abstractions, we now need to divvy up the transformations that were originally available on Observable
, by determining which ones apply to which of these more focused abstractions. We’ll find that some still apply in all cases, while others simply don’t make sense in all contexts.
We should first note that, like Observable
itself, all four abstractions I identified retain the property of being generics with a single type parameter. Now, let’s consider the simplest transformation, map
: a 1-1 conversion transformation that can change that type parameter. This transformation continues to apply to all four abstractions.
We can map
an EventStream
to an EventStream
: this creates a new EventStream
, which subscribes to its source EventStream
, and for each received event, applies a transformation to produce a new event of a new type, and then publishes it. We can map
a DataStream
to a DataStream
: this creates a new DataStream
, where every time we consume one of its values, it first consumes a value of its source DataStream
, then applies a transformation and returns the result to us. We can map
an ObservableValue
: this creates a new ObservableValue
whose value is, at any time, the provided transformation of the source ObservableValue
(this means it must be read only. We can’t manually set the derived ObservableValue
‘s value without breaking this relationship). It therefore updates every time the source ObservableValue
updates. We can map
a Task
to a Task
: this is a Task
that performs its source Task
, then takes the result, transforms it, and returns it as its own result.
We also have the flatMap
operator. The name is confusing, and derives from collections: a flatMap
of a Collection
maps each element to a Collection
, then takes the resulting Collection
of Collection
s and flattens it into a single Collection
. Really, this is a compound operation that first does a map
, then a flatten
. The generalizing of this transformation is that it takes an X
of X
of T
, and turns it in an X
of T
.
The flatten
operator, and therefore flatMap
, also continues to apply to all of our abstractions. How do we turn an EventStream
of EventStream
s of T
into an EventStream
of T
? By subscribing to each inner EventStream
as it is published by the outer EventStream
, and publishing its events to a single EventStream
. The resulting EventStream
receives all the events from all the inner EventStream
s as they become available. How do we turn a DataStream
of DataStream
s of T
into a DataStream
of T
? When we consume a value, we consume a single DataStream
from the outer DataStream
, store it, and then supply each value from it on each consumption, until it runs out, then we go consume the next DataStream
from the outer DataStream
, and repeat. How do we turn an ObservableValue
of an ObservableValue
of T
into an ObservableValue
? By making the current value the current value of the current ObservableValue
of the outer ObservableValue
(which then updates every time either the outer or inner ObservableValue
updates). How do we turn a Task
that produces a Task
that produces a T
into a Task
that produces a T
? We run the outer Task
, then take the result inner Task
, run it, and return its result.
In Rx land, it was realized that flatten
actually has another variation that didn’t come up with ordinary Collection
s. Each time a new inner Observable
is published, we could continue to observe the older inner Observable
s, or we can stop observing the old one and switch to the new one. This is a slightly different operator called switch
, and it leads to the combined operator switchMap
. For us, this continues to be the case for only EventStream
, because the concept depends on being producer-driven: that streams publish values on their own accord, and we must decide whether to keep listening for them. DataStream
s are consumer-driven, so flatMap
must get to the end of one inner DataStream
before moving to the next. ObservableValue
and Task
don’t involve multiple values so the concept doesn’t apply there.
Now let’s look at another basic transformation: filter
. Does this apply to all the abstractions? No... because filtering is inherently about multiple values: some get through, some don’t. But only two of our four abstractions involve multiple values: EventStream
and DataStream
. We can therefore meaningfully filter
those. But ObservableValue
and Task
? Filtering makes no sense there, because there’s only one value or result. Any other transformations that inherently involve multiple values (filter
is just one, others include buffer
or accumulate
) therefore only apply to EventStream
and DataStream
, but not ObservableValue
or Task
.
Another basic operator is combining: taking multiple abstractions and combining them into a single one. If we have an X
of T1
and an X
of T2
, we may be able to combine them into a single X
of a combined T1
and T2
(i.e. a (T1, T2)
tuple). Or, if we have a Collection
of X
s of T
s, we can combine them into a single X
of a Collection of T
s, or possibly a single X
of T
. Can this apply to all four abstractions? Yes, but we’ll see that for abstractions that involve multiple values, there are multiple ways to “combine” their values, while for the ones that involve on a single value, there’s only one way to “combine” their values.
That means for ObservableValue
and Task
, there’s one simple combine
transformation. A combined ObservableValue
is one whose value is the tuple/collection made up of its sources’ values, and therefore it changes when any one of its source values changes. A combined Task
is one that runs each one of its source Task
s in parallel, waits for them all to finish, then returns the combined results of all as its own result (notice that, since Task
is fundamentally concerned with execution order, this becomes a key feature of one of its transformations).
With EventStream
and DataStream
, there are multiple ways in which we can combine their values. With EventStream
, we can wait for all sources to publish one value, at which point we publish the first combined value, we store these combined values, then each time any source value changes, we update only the published one, keeping all the rest the same, and publish that new combined value. This is the combineLatest
operator: each published event represents the most recently published events from each source stream. We can alternatively wait for each source to publish once, at which point we publish the combination, then we don’t save any values, and again wait for all sources to publish again before combining and publishing again. This is the zip
operator.
But combineLatest
doesn’t make sense for DataStream
though, because it based on when each source stream publishes. The “latest” of combineLatest
refers to the timing of the source stream events. Since DataStream
s are consumer-driven, there is no timing. The DataStream
is simply asked to produce a value by a consumer. Therefore, there’s only combine
, where when a consumer consumes a value, the combined DataStream
then consumes a value from each of its sources, combines then and returns it. This is the zip
operator, which continues to apply to DataStream
.
Both EventStream
and DataStream
also ways to combine multiple streams into a single stream of the same type. With EventStream
, this is simply the stream that subscribes to multiple sources and publishes when it receives a value from any of them. This is the merge
operator. The order in which a merged EventStream
publishes is dictated by the order it receives events from its source streams. We can do something similar with DataStream
, but since DataStream
s are consumer-driven, the transformation has to decide which source to consume from. A merge
would be for the DataStream
to first consume all the values of the first source stream, then all the values of second one, and so on (thus making it equivalent to a merge
on a Collection
… we could also call this concatenate
to avoid ambiguity). We can also do a roundRobin
: each time we consume a value the combined stream consumes one from a particular source, then one from the next one, and so on, wrapping back around after it reaches the end. There are all sorts of ways we can decide how to pick the order of consumption, and a custom algorithm can probably be plugged in as a Strategy to a transformation.
Somewhat surprisingly, I believe that covers it for ObservableValue
and Task
, with one exception (see below): map
, flatten
and combine
are really the only transformations we can meaningfully do with them, because all other transformations involve either timing or value streams. Most of the remaining transformations from Rx we haven’t talked about will still apply to both EventStream
and DataStream
, but there are some important ones that only apply to one or the other. Any transformations that involve order apply only to DataStream
, for example append
or prepend
. Any transformations that are driven by timing of the source streams apply only to EventStream
, for example debounce
or delay
. And some transformations are really not transformations but conversions.
The exception I mentioned is for ObservableValue
. EventStream
s are “hot”, and their transformations are “eager”, and it never makes sense for them to not be (in the realist interpretation of events and “observing”). Derived ObservableValue
s, however, can be “eager” or “lazy”, and both are perfectly compatible with the abstraction. If we produce one ObservableValue
from a sequence of transformations (say, map
s and combine
s) on other ObservableValue
s, then we can choose to either perform those transformations every time someone reads the value, or we can choose to store the value, and simply serve it up when asked.
I believe the best way to implement this is to have derived ObservableValue
s be lazy by default: their values get computed from their sources on each read. This also means when there are subscribers to updates, they must subscribe to the source values’ updates, then apply the transformations each time new values are received by the sources. But sometimes this isn’t the performance we want. We might need one of those derived values to be fast and cheap to read. To do that, we can provide the cache
operator. This takes an ObservableValue
, and creates a new one that stores its value directly. This also requires that it eagerly subscribe to its source value’s updates and use them to update the stored value accordingly. There is also an issue of thread safety: what if we want to read a cached ObservableValue
from one thread but it’s being written to on another thread? To handle this we can allow the cache
operator to specify how (using a Scheduler
) the stored value is updated. These issues of caching and thread safety are unique to observable values.
Converters
Now let’s talk about how we can turn one of these four abstractions into another one of of the four. In total that would be 12 possible converters, assuming that there’s a useful or sensible way to convert each abstraction into each other one.
Let’s start with EventStream
as the source.
What does it mean to convert an EventStream
into a DataStream
? This means we’re talking a producer driven stream and converting it to a consumer driven stream. Remember the key distinction is that EventStream
s are defined by timing: the events happen when they do, and subscribers are either subscribed at the time or they miss them. DataStream
s are defined by order: the data are returned in a specific sequence, and it’s not possible to “miss” one (you can skip one but that’s a conscious choice of the consumer). Thus, turning an EventStream
into a DataStream
is fundamentally about storing events as they are received until they are later consumed, ensuring that none can be missed. It is, therefore, buffering. For this reason, this conversion operator is called buffer
. It internally builds up a Collection
of events received from the EventStream
, and when a consumer consumes a value, the first element in the collection is returned immediately. If the Collection
is empty, the consumer will be blocked until the next event is received.
What does it means to convert an EventStream
to an ObservableValue
? It would mean we’re storing the latest event emitted by the stream, so we can query what it is at any time. We call this converter cacheLatest
. Note that the latest event must be cached, or else we wouldn’t be able to read it on demand. That’s fundamentally what this converter is doing: taking transient events that are gone right after they occur, and making them persistent values that can be queried as needed. This can be combined with other transformations on EventStream
to produce some useful derived converters. For example, if we apply the accumulate
operator to the EventStream
, then use cacheLatest
to produce an ObservableValue
, the result is an accumulateTo
operator, which stores a running accumulation (perhaps a sum) of incoming values over time.
What does it mean to convert an EventStream
to a Task
? Well, basically it would mean we create a Task
that waits for one or more events to be published, then returns them as the result. But as we will see soon, it makes more sense to create “wait for the next value” Task
s out of DataStream
s, and we can already convert EventStream
s to DataStream
s with buffer
. Therefore, this converter would really be a compound conversion of first buffering and then fetching the next value. We can certainly write it as a convenience function, but under the hood it’s just composing other converters.
Now let’s move to DataStream
as the source.
What does it mean to convert a DataStream
to an EventStream
? Well, an EventStream
publishes its own events, but a DataStream
only returns values when a consumer consumes them. Thus, turning a DataStream
into an EventStream
involves setting up a consumer to immediately start consuming values, and publish them as soon as they are available. The result is that multiple observers can now listen for those values as they get published, with the caveat that they’ll miss any values if they don’t subscribe at the right time. We can call this conversion broadcast
.
What does it mean to convert a DataStream
to an ObservableValue
? Nothing useful or meaningful, as far as I can tell. Remember meaning of converting an EventStream
to an ObservableValue
was to cache the latest value. That’s a reference to timing. But timing in a DataStream
is controlled by the consumer, so all that could mean is a consumer powers through all the values and saves them to an ObservableValue
. The result is a rapidly changing value that then gets stuck on the last value in the DataStream
. That doesn’t appear to be a valid concept.
What does it mean to convert a DataStream
to a Task
? Simple: read the next value! In fact, in .NET, where Task
is the return value of all async functions, the return value of the async read method would then have to be a Task
. There can, of course, be other related functions to read more than one value. We can also connect an input DataStream
to an output DataStream
(which, remember, is a valid abstraction but not one carved out from Observable
, which only represents sources and not sinks), which results in a Task
whose work is to consume values from the input and send them to the output.
Now let’s move to ObservableValue
as the source.
What does it mean to convert an ObservableValue
to an EventStream
? Simple: publish the updates! Now, part of an ObservableValue
‘s features is being able to subscribe to updates. How is this related to publishing updates? When we subscribe to an ObservableValue
, the subscription is lazy (for derived ObservableValue
s, the transformations are applied to source values as they arrive), and we have the option of notifying our subscriber immediately with the current value. But when we produce an EventStream
from an ObservableValue
, remember EventStream
s are always hot! It must eagerly subscribe to the ObservableValue
and then publish each update it receives. This is significant for derived lazy ObservableValue
s, because as long as someone holds onto an EventStream
produced from it, it has an active subscription and therefore its value is being calculated, which it wouldn’t be if no one was subscribed to it. We can call this converter publishUpdates
: it is basically ensuring that updates are eagerly computed and broadcasted so that anyone can observe them as any other EventStream
.
What does it mean to convert an ObservableValue
to a DataStream
? Nothing useful or meaningful that I can think of. At best it would be a stream that lets us read the updates, but that’s just publishUpdates
followed by buffer
.
What does it mean to convert an ObservableValue
to a Task
? Again, I can’t think of anything useful or meaningful, that wouldn’t be a composition of other converters.
Now let’s move to Task
as the source.
Task
s don’t convert to any of the other abstractions in any meaningful way, because they have only a single return value. Even ObservableValue
s, which fundamentally represent single values, still have an associated stream of updates. Task
s don’t even have this. For this reason, we can’t derive any kind of meaningful stream from a Task
, which means there’s also nothing to observe.
The converters are summarized in the following table (the row is the input, the column is the output):
EventStream | DataStream | ObservableValue | Task | |
EventStream | N/A | buffer | cacheLatest | none |
DataStream | broadcast | N/A | none | read |
ObservableValue | publishUpdates | none | N/A | none |
Task | none | none | none | N/A |
There are, in total, only five valid converters.
Conclusion
After separating out the abstractions, we find that the humongous zoo of operators attached to Observable
is tidied up into more focused groups of transformations on each of the four abstractions, plus (where applicable) ways to convert from one to the other. What this reveals is that places where an Rx-driven app creates deep confusion over “hotness/coldness” and side effects are areas where an Observable
really represents one of these four abstractions but it combined with a transformation operation that does not apply to that abstraction. For example, one true event stream (say, of mouse clicks or other user gestures) appended to another one makes no sense. Nor does trying to merge two observable values into a stream based on which one changes first.
In the final part, we’ll revisit the example app from the first part, rewrite it with our new abstractions and escape the clutches of Rx Hell.