An Interesting Case for Using a Custom RxJs Operator | by Enrico Piccinin | Jun, 2022

The RxJs library comes with a rich set of operators, which are able to address most of the cases we have to address when dealing with observable streams.

There are still though situations when the possibility to build new custom operators can come in handy. Let’s look at one particular case inspired by a question posted on stackoverflow.

Let’s consider a scenario where we have a stream of records, for instance coming from a car’s “black box”, which is flowing into a receiving system that has to store them into a database as fast as possible.

The receiving system has an API, saveRec, which can accept an array of records to be stored and responds, asynchronously, when all records have been written. Unfortunately we have also a constraint: saveRec can not be called concurrently. In other words, when we can call the API we have to wait for its response before calling it again.

The stream of records is received and saved with sequential calls to the saveRec API

When the level of concurrency is limited to 1, the typical RxJs approach is to use the concatMap operator (which is nothing but mergeMap with concurrent parameter set to 1).

Using concatMap in this case may not be the optimal solution though. Let’s imagine, for instance, that the time interval between the records received is shorter than the time required by the saveRec to respond. In this case we would end up accumulating records which are waiting to be stored (such records are saved in a buffer internal to the concatMap operator, but we will come back to this point later).

concatMap buffers some records while executing the saveRec operation

Considering that saveRec can accept an array of records, it would be ideal to invoke it with all the records buffered during the execution of the previous call.

For instance, if we receive rec_4 and rec_5 while executing the saveRec operation on rec_3then in the next invocation we would love to pass both rec_4 and rec_5 to be saved.

bufferConcatMap invokes saveRec with all the records accumulated in its buffer

And this is what we are going to implement with the new custom operator bufferConcatMap. Actually bufferConcatMap is a Pipeable Operator Factory Functionsince it is a function that, when invoked, returns a Pipeable Operator which is then passed as parameter to the pipe method of Observable.

Before starting the implementation, it may be worth to define what an RxJs Pipeable Operator is and, as a consequence, what a Pipeable Operator Factory Function is.

A Pipeable Operator is a function that accepts an Observable as input and returns an Observable as output (the RxJs code represents a Pipeable Operator type with the interface OperatorFunction<T, R> or one of its extensions).

A Pipeable Operator Factory Function is a function that returns a Pipeable Operator.

So our new custom operator bufferConcatMap is a function that returns a function that expects an Observable as input and return an Observable.

While to be precise this is a Pipeable Operator Factory Functionwe will call it from now on simply a Custom Operatorsince this is the generally accepted way to refer to such kind of functions.

A simple Custom Operator can be coded like this.

A custom operator (more precisely a “Pipeable Operator Factory Function”) which mirrors the source Observable

This Custom Operator creates a new Observable which mirrors the source Observable. In other words, it does nothing but still shows how the mechanism works.

Now, let’s assume we want to transform each value notified by a source Observable and then pass the transformed value, together with its index, to the subscriber (in other words we want to build the rxJsmap operator from scratch).

This is can be done

  • passing to the Pipeable Operator Factory Function the function to be used to transform each value received from the source
  • leveraging the Javascript “closure” concept to define a variable (the variable index in this example) that is initialized any time the Observable returned is subscribed and is incremented any time the next function is called on the subscriber
The “map” operator built from scratch

It is interesting to note that, leveraging Javascript closures, we have attached some “state” (the index variable) to a function, the next function defined by the subscriber. The ability to add a state to a function will turn out to be important in the implementation of bufferConcatMap.

Now that we have clarified the basics of custom operators, let’s start building our bufferConcatMap.

The expected behaviour

The expected behavior, using the example described above, can be expressed with a marble diagram.

Marble diagram of bufferConcatMap

Expressed in words we could say:

  • if upstream notifies and there is no processing on the fly, then the value notified is immediately processed
  • if upstream notifies and there is processing on the fly, then the value notified is stored in a buffer
  • As soon as processing completes, if there is something in the buffer it gets immediately processed, otherwise we wait for the next element to start a new processing

For the sake of simplicity, we do not cover in the “words of the requirements” the cases of error and complete (but they are in the code).

We need some state

Yes, we need to hold some state information for our operator to work as expected:

  • we need to have a buffer to hold the values ​​that come from upstream while we are still processing a request;
  • “While we are processing a request”? This means that we need to have a way to know whether we are processing a request.

So our state is composed by 2 variables:

  • a buffer (an Array) to hold items coming that we can not process;
  • a processing flag (a boolean) where we store whether there is a request on fly or not.

The anatomy of the operator

The bufferConcatMap operator is actually a Pipeable Operator Factory Function which expects a project function as input parameter and returns a Pipeable Operator.

The input parameter

The project function is a function that expects an array of values ​​in input and returns an Observable. Why an array of values ​​as input? Because bufferConcatMap stores the values ​​received in a buffer, which is an array, and passes this buffer to the project function as soon as there is no processing on the fly.

Why return an Observable? Because bufferConcatMap behaves as a variation of concatMap and, as concatMap does, it expects that the function passed as input is a function that returns an Observable.

The returned Pipeable Operator

The Pipeable Operator returned as output is itself a function that expects a source Observable as input and returns a “new transformed Observable”.

The “new transformed Observable” behaves similarly to the one created with concatMap if no request is on the fly: it invokes the project function and waits for its response before making the next invocation, the difference being that the project function is invoked with a buffer of values ​​coming from the source Observable (upstream) and not with just one value. On the other hand, if there is a request already on the fly, it takes the values ​​notified by the source Observable and stores it into its internal buffer without invoking the project function (in other words with no effect downstream).

The code of the operator

Here, finally, comes the full code of our custom operator.

bufferConcatMap custom operator

And here is an example of how to use it:

Example of use of bufferConcatMap

Let’s look at some key concepts that stand at the basis of this logic.

The most external function (bufferConcatMap). The most external function, a Pipeable Operator Factory Functionis invoked as part of the invocation of the pipe for which it represents a parameter (in other words, looking at the example above, when newTransformerObservable is created).

The Pipeable Operator. The function returned by bufferConcatMap the real Pipeable Operatoris invoked only when newTransformerObservable is subscribed.

Internal state of the operator. When Pipeable Operator is invoked (at subscription time) the variables holding the state of the operator (bufferedNotifications and processing in our case) are initialized. These variables are part of the lexical scope of any function defined within the body of the Pipeable Operator (for instance the functions defined as parameters passed to the concatMap operator embedded in the implementation of bufferConcatMap) and therefore are shared among all the invocations of such functions which occur any time the source Observable (upstream) notifies a new value.

project and EMPTY. The implementation of bufferConcatMap is a variation around concatMap that uses some internal state to implement the desired behaviour. concatMap requires to be passed, as input, a function that returns an Observable. In case some processing is on the fly when a new value is notified by upstream, bufferedConcatMap has just to buffer such value and do nothing. The Observable that just does nothing is the EMPTY Observable. It is an Observable that simply completes as soon as it receives the first notification from upstream. So this is why bufferConcatMap uses EMPTY when processing is true. On the other hand, if there is no processing going on, the project function is invoked with the values ​​stored in the buffer, as long as there are values ​​in it.

All is good so far, at least it looks so. But there is something we overlooked.

Let’s assume we create an Observable using bufferConcatMap like this and then we subscribe twice, concurrently, to it, like this

const newObs = source.pipe(bufferConcatMap((val) => process(val)));newObs.subscribe(values => {// do something});newObs.subscribe(values => {// do something else});

In this case, the state variables (bufferedNotifications and processing) are initialized when bufferConcatMap is invoked, in other words when the newObs is created.

So, the same variables are shared within both subscriptions, which is wrong and can generate very subtle errors. We need to make sure that each subscription has its own copy of state variables which are initialized at subscription time.

Luckily in this case the rxJs operator defer comes to our rescue. So the final correct version of the operator is the following.

Final version of bufferConcatMap code which also works with concurrent subscriptions

In most cases the operators provided by the RxJS library are sufficient but sometimes having the possibility to build custom operators can come handy. If this is the case, we have to remember that Observables are just specialized functions. Operators are just functions that work on such specialized functions.

Building a custom operator is therefore an exercise of function composition that has to follow a limited set of rules. Using closures we can add an inner state to the operators to enhance their capabilities while maintaining the external stateful behaviour.

All the code behind bufferConcatMapincluding the tests, can be found in this repo.

Leave a Comment