Meet defer

In a previous article, we looked at how to create our own RxJS statements. Now I want to talk about a little-known creation operator - defer- and explain how you can use it for some situations


Suppose you need to make an operator that takes a function and executes it only once, the first time it receives a value. We implement it under the name tapOnce


function tapOnce<T>(fn: Function): OperatorFunction<T, T> {
  return function(source: Observable<any>) {
    let run = false;
    return source.pipe(
      tap(() => {
        if (!run) {
          fn();
          run = true;
        }
      })
    );
  };
}

The code is clear - it is tapused to run the function, the flag is runneeded to do this only once. Now we use the operator.


const source = interval(5000).pipe(
  tapOnce(() => console.log('+')
));

source.subscribe(console.log);

Everything works, the plus sign is displayed in the console only at the first emite. Now add the subscribers.


const source = interval(5000).pipe(
  tapOnce(() => console.log('+')
));

source.subscribe(console.log);
source.subscribe(console.log);

If you look at the console - there is only one plus. The problem is that both subscribers use the same lexical environment and refer to the same variable run. We need a way to postpone the creation of a thread until someone subscribes.


Will help defer


import { defer } from 'rxjs';

function tapOnce<T>(fn: Function): OperatorFunction<T, T> {
  return function(source: Observable<any>) {
    return defer(() => {
      let run = false;
      return source.pipe(
        tap(() => {
          if (!run) {
            fn();
            run = true;
          }
        })
      );
    });
  };
}

The operator deferaccepts a function that should return ObservableInput. The code inside deferwill be executed only upon subscription, and not during creation. Using this approach and thanks to js closure, each subscriber uses his own lexical environment.


Let's create our simple implementation deferfor a better understanding.


function defer(observableFactory: () => ObservableInput<any>) {
  return new Observable(subscriber => {
    const source = observableFactory();
    return source.subscribe(subscriber);
  });
}

defer returns a new stream, which is created at the moment of subscription by the factory function, and will be used as a source.


Here are more examples where it will be useful defer. Say we have an expression that needs to be counted when someone signs up. for instance


const randNum = of(Math.random());

randNum.subscribe(console.log);
randNum.subscribe(console.log);

In this example, each subscriber will receive the same random value. You can correct it so that the expression is counted when subscribing, not when announcing.


const randNum = defer(() => of(Math.random()));

randNum2.subscribe(console.log);
randNum2.subscribe(console.log);

// The same concept as using a function
const randNum = () => of(Math.random());
randNum2().subscribe(console.log);
randNum2().subscribe(console.log);

Another example is when you need to delay the execution of a promise.


// This already executing regardless the numbers of handlers
const promise = new Promise(resolve => {
  resolve();
});

// Deferring the creation of the promise until someone subscribes
const promiseDefered = defer(() => {
  return new Promise(resolve => {
    resolve();
  });
});

promiseDefered.subscribe(console.log);

Promises are executed immediately, regardless of the number of listeners. You can make a promise look like a stream (i.e. lazy) using defer.


All Articles