The RxJs library comes with a rich set of operators, which are able to address most of the cases we have to address when dealing with observable streams.
There are still though situations when the possibility to build new custom operators can come in handy. Let’s look at one particular case inspired by a question posted on stackoverflow.
Let’s consider a scenario where we have a stream of records, for instance coming from a car’s “black box”, which is flowing into a receiving system that has to store them into a database as fast as possible.
The receiving system has an API, saveRec
, which can accept an array of records to be stored and responds, asynchronously, when all records have been written. Unfortunately we have also a constraint: saveRec
can not be called concurrently. In other words, when we can call the API we have to wait for its response before calling it again.

When the level of concurrency is limited to 1, the typical RxJs approach is to use the concatMap
operator (which is nothing but mergeMap
with concurrent
parameter set to 1
).
Using concatMap
in this case may not be the optimal solution though. Let’s imagine, for instance, that the time interval between the records received is shorter than the time required by the saveRec
to respond. In this case we would end up accumulating records which are waiting to be stored (such records are saved in a buffer internal to the concatMap
operator, but we will come back to this point later).

Considering that saveRec
can accept an array of records, it would be ideal to invoke it with all the records buffered during the execution of the previous call.
For instance, if we receive rec_4
and rec_5
while executing the saveRec
operation on rec_3
then in the next invocation we would love to pass both rec_4
and rec_5
to be saved.

And this is what we are going to implement with the new custom operator bufferConcatMap
. Actually bufferConcatMap
is a Pipeable Operator Factory Functionsince it is a function that, when invoked, returns a Pipeable Operator which is then passed as parameter to the pipe
method of Observable
.
Before starting the implementation, it may be worth to define what an RxJs Pipeable Operator is and, as a consequence, what a Pipeable Operator Factory Function is.
A Pipeable Operator is a function that accepts an Observable as input and returns an Observable as output (the RxJs code represents a Pipeable Operator type with the interface OperatorFunction<T, R>
or one of its extensions).
A Pipeable Operator Factory Function is a function that returns a Pipeable Operator.
So our new custom operator bufferConcatMap
is a function that returns a function that expects an Observable as input and return an Observable.
While to be precise this is a Pipeable Operator Factory Functionwe will call it from now on simply a Custom Operatorsince this is the generally accepted way to refer to such kind of functions.
A simple Custom Operator can be coded like this.

This Custom Operator creates a new Observable which mirrors the source Observable. In other words, it does nothing but still shows how the mechanism works.
Now, let’s assume we want to transform each value notified by a source Observable and then pass the transformed value, together with its index, to the subscriber (in other words we want to build the rxJsmap
operator from scratch).
This is can be done
- passing to the Pipeable Operator Factory Function the function to be used to transform each value received from the source
- leveraging the Javascript “closure” concept to define a variable (the variable
index
in this example) that is initialized any time the Observable returned is subscribed and is incremented any time thenext
function is called on thesubscriber

It is interesting to note that, leveraging Javascript closures, we have attached some “state” (the index
variable) to a function, the next
function defined by the subscriber
. The ability to add a state to a function will turn out to be important in the implementation of bufferConcatMap
.
Now that we have clarified the basics of custom operators, let’s start building our bufferConcatMap
.
The expected behaviour
The expected behavior, using the example described above, can be expressed with a marble diagram.

Expressed in words we could say:
- if upstream notifies and there is no processing on the fly, then the value notified is immediately processed
- if upstream notifies and there is processing on the fly, then the value notified is stored in a buffer
- As soon as processing completes, if there is something in the buffer it gets immediately processed, otherwise we wait for the next element to start a new processing
For the sake of simplicity, we do not cover in the “words of the requirements” the cases of error
and complete
(but they are in the code).
We need some state
Yes, we need to hold some state information for our operator to work as expected:
- we need to have a buffer to hold the values that come from upstream while we are still processing a request;
- “While we are processing a request”? This means that we need to have a way to know whether we are processing a request.
So our state is composed by 2 variables:
- a
buffer
(an Array) to hold items coming that we can not process; - a
processing
flag (a boolean) where we store whether there is a request on fly or not.
The anatomy of the operator
The bufferConcatMap
operator is actually a Pipeable Operator Factory Function which expects a project
function as input parameter and returns a Pipeable Operator.
The input parameter
The project
function is a function that expects an array of values in input and returns an Observable. Why an array of values as input? Because bufferConcatMap
stores the values received in a buffer, which is an array, and passes this buffer to the project
function as soon as there is no processing on the fly.
Why return an Observable? Because bufferConcatMap
behaves as a variation of concatMap
and, as concatMap
does, it expects that the function passed as input is a function that returns an Observable.
The returned Pipeable Operator
The Pipeable Operator returned as output is itself a function that expects a source
Observable as input and returns a “new transformed Observable”.
The “new transformed Observable” behaves similarly to the one created with concatMap
if no request is on the fly: it invokes the project
function and waits for its response before making the next invocation, the difference being that the project
function is invoked with a buffer
of values coming from the source
Observable (upstream) and not with just one value. On the other hand, if there is a request already on the fly, it takes the values notified by the source
Observable and stores it into its internal buffer without invoking the project
function (in other words with no effect downstream).
The code of the operator
Here, finally, comes the full code of our custom operator.
And here is an example of how to use it:
Let’s look at some key concepts that stand at the basis of this logic.
The most external function (bufferConcatMap
). The most external function, a Pipeable Operator Factory Functionis invoked as part of the invocation of the pipe
for which it represents a parameter (in other words, looking at the example above, when newTransformerObservable
is created).
The Pipeable Operator. The function returned by bufferConcatMap
the real Pipeable Operatoris invoked only when newTransformerObservable
is subscribed.
Internal state of the operator. When Pipeable Operator is invoked (at subscription time) the variables holding the state of the operator (bufferedNotifications
and processing
in our case) are initialized. These variables are part of the lexical scope of any function defined within the body of the Pipeable Operator (for instance the functions defined as parameters passed to the concatMap
operator embedded in the implementation of bufferConcatMap
) and therefore are shared among all the invocations of such functions which occur any time the source Observable (upstream) notifies a new value.
project
and EMPTY
. The implementation of bufferConcatMap
is a variation around concatMap
that uses some internal state to implement the desired behaviour. concatMap
requires to be passed, as input, a function that returns an Observable. In case some processing is on the fly when a new value is notified by upstream, bufferedConcatMap
has just to buffer such value and do nothing. The Observable that just does nothing is the EMPTY
Observable. It is an Observable that simply complete
s as soon as it receives the first notification from upstream. So this is why bufferConcatMap
uses EMPTY
when processing
is true
. On the other hand, if there is no processing going on, the project
function is invoked with the values stored in the buffer, as long as there are values in it.
All is good so far, at least it looks so. But there is something we overlooked.
Let’s assume we create an Observable using bufferConcatMap
like this and then we subscribe twice, concurrently, to it, like this
const newObs = source.pipe(bufferConcatMap((val) => process(val)));newObs.subscribe(values => {// do something});newObs.subscribe(values => {// do something else});
In this case, the state variables (bufferedNotifications
and processing
) are initialized when bufferConcatMap
is invoked, in other words when the newObs
is created.
So, the same variables are shared within both subscriptions, which is wrong and can generate very subtle errors. We need to make sure that each subscription has its own copy of state variables which are initialized at subscription time.
Luckily in this case the rxJs operator defer
comes to our rescue. So the final correct version of the operator is the following.
In most cases the operators provided by the RxJS library are sufficient but sometimes having the possibility to build custom operators can come handy. If this is the case, we have to remember that Observables are just specialized functions. Operators are just functions that work on such specialized functions.
Building a custom operator is therefore an exercise of function composition that has to follow a limited set of rules. Using closures we can add an inner state to the operators to enhance their capabilities while maintaining the external stateful behaviour.
All the code behind bufferConcatMap
including the tests, can be found in this repo.