We study multicast RxJS operators

Hello, Habr! I present to you the translation of the article "Understanding RxJS Multicast Operators" by Netanel Basal.

Broadcast or multicast operators often seem like the hardest topic to learn about RxJS. In this article I will try to explain everything in an accessible way.

We will consider the internal structure of multicast operators and the tasks they solve.

Let's start by describing the basic building blocks of RxJS.

Observable


In RxJS, observable objects (hereinafter referred to as “streams”) are initially cold. This means that every time you subscribe to a stream, a callback of the subscription is performed.

For a better understanding, create the following implementation:

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }
}

The constructor Observableaccepts a single parameter - the callback of the subscription
subscriptionFn. It will be called every time we subscribe to the stream ( subscribe()).

Sometimes they also call a callback of a subscription producer, as it also “produces” values ​​for the subscriber (observer object in our code).

The method subscribe()takes an input observer. It is an object with three own methods: next(), error(), complete(). In live RxJS, you can pass three functions instead of an object.

The method, subscribe()when called, calls the subscription function passing it to the input observer.

We did not mention the method nowunsubscribe, but it should be borne in mind that each subscription provides a way to destroy it. Most often, a subscription returns a function (or an object with the appropriate method), during which the connection between the stream and its subscribers is destroyed.

This is all pretty simple. Let's move closer to reality now. For example, wrap a native XHR API in a stream


function http(url) {
  // This function will be called when we call http().subscribe()
  const subscriptionFn = observer => {
    log('Observable execution: http');
    const xhr = new XMLHttpRequest();
    xhr.addEventListener('load', () => {
      if (xhr.readyState === 4 && xhr.status === 200) {
        observer.next(JSON.parse(xhr.responseText));
        observer.complete();
      }
    });
    xhr.open('GET', url);
    xhr.send();
    
    return () => xhr.abort()
  }
  
  return new Observable(subscriptionFn);
}

We wrote a function httpthat receives a URL, executes an http request, and returns a stream that emits the received http response.

Now, looking at our implementation, what do you think will happen when we subscribe to this stream twice?


// A small observer helper
const observer = tag => ({
  next(value) {
    console.log(`${tag}:`, value);
  }
});

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-1'));

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-2'));

Correctly, two http requests will be executed. If we look again at the implementation of the Observable class, we will see why this is so. Each subscriber calls a subscription callback, which in turn performs an http request each time.



Operators


An operator is a function that takes a stream as an input, performs any action, and returns a stream.

We will write our first own operator.
function map(fn) {
  return source => {
    return new Observable(observer => {
      log('Observable execution: map');
      return source.subscribe({
        next(value) {
          observer.next(fn(value));
        }
      });
    });
  };
}

The function map()returns an operator that accepts the original stream and returns a stream in which all passing values ​​will be passed through the function fn.

Those. inside it there is always a subscription to the input stream.

Before using this new operator, we need to somehow attach it to the stream. Extend our class Observablebypipe()

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }

  pipe(...operators) {
    return operators.reduce((source, next) => next(source), this);
  }
}

A simple method, just one line of code. pipeIt takes an array of operators and calls them in turn, passing each input the result of the previous one.

Let's use our operator:

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(observer('subscriber'));

When called subscribe, a subscription to the output stream will be executed map(), and in turn map, a subscription to the original stream will be executed inside .

The http stream emits the value it falls into map. Then, the function is executed fn, the stream from mapemits the value to the final subscription. It works like a observable chainthread chain.



If we subscribe to a chain twice, each subscription in the chain will be called twice.

const firstUser$ = http('https://jsonplaceholder.typicode.com/users')
    .pipe(map(res => res[0]));

firstUser$.subscribe(observer('subscriber-1'));
firstUser$.subscribe(observer('subscriber-2'));



And if this behavior does not suit us? If we want to call the subscription function only once, how many subscriptions would we have?

For example, what if we want to make one http request and use the result for all subscribers? In this case you need Subject.

Subjects


Subjectit is both a stream and a subscriber. The flow - because it has a method subscribe(), the subscriber - because it implements the subscriber interface - methods next(), error(), complete().

Let's write it.

class Subject extends Observable {
  constructor() {
    super();
    this.observers = [];
  }

  subscribe(observer) {
    this.observers.push(observer);
  }

  next(value) {
    this.observers.forEach(observer => observer.next(value));
  }

  error(error) {
    this.observers.forEach(observer => observer.error(error));
  }

  complete() {
    this.observers.forEach(observer => observer.complete());
  }
}

Subjectcan act as an intermediate between the cold stream and many subscribers.

Change our example as follows:

const subject = new Subject();
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(subject);

When called subject.subscribe(someFn), only one simple operation is performed - adding a subject.observersfunction to the array someFn.

Well, then, since it Subjectbehaves as a subscriber too, you can subscribe it to the original stream, i.e. when the original thread issues a value, it is called subject.next(), which entails the transfer of this value to each of the subscribers subject.

Now we have the original callback of the subscription is executed once, and only one http-request will be executed.



Party latecomers


What happens if the original stream already worked before we signed up?

It will not be possible to show this in the previous example, since http is asynchronous, even if you subscribe to it immediately after, the value will still come after the subscription.

Let's quickly create a generating function of:


function of(...values) {
  return new Observable(observer => {
    log('Observable execution: of');
    values.forEach(value => observer.next(value));
  });
}

A stream created by means of()emits values ​​synchronously, one after the other. We will subscribe subjectafter it has already subscribed to of.

const subject = new Subject();
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

Our subscribers have not received anything. Why? Our implementation does not support “late” subscribers. When the original stream from of()emits values, subscribers are not registered yet, these values ​​will go nowhere.

In actual examples on Angular, it may well be that the source stream worked, but your component is not yet present on the page. And when the component appears, it subscribes to the source, but does not receive the values ​​that have already passed.

One way to solve the problem is this ReplaySubject. We outline its version and see how it works.


class ReplaySubject extends Subject {
  constructor(bufferSize) {
    super();
    this.observers = [];
    this.bufferSize = bufferSize;
    this.buffer = [];
  }

  subscribe(observer) {
    this.buffer.forEach(val => observer.next(val));
    this.observers.push(observer);
  }

  next(value) {
    if (this.buffer.length === this.bufferSize) {
      this.buffer.shift();
    }

    this.buffer.push(value);
    this.observers.forEach(observer => observer.next(value));
  }
}

The concept is simple. As the name implies, ReplaySubjectthis is a special Subjectone that can reproduce old values ​​to all new subscribers.

Each released value will be transferred to all current subscribers and saved for future ones, the buffer size is bufferSizeset in the constructor.

Rewrite the previous example with ReplaySubject.

const subject = new ReplaySubject(3);
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

The result has changed.

Despite the late subscription, we caught them all.



Summarizing, the purpose ReplaySubjectis the distribution of values ​​to all subscribers and caching them for future "late" subscribers.

Before moving on, I recommend that you try writing your own implementation BehaviorSubject. You can find the finished code at the end of the article.

Now we finally move on to multicast operators. I hope the above examples will help you understand them faster.

Multicast operators


Multicast and Connect


The operator multicast() uses Subjectto issue the source stream to several subscribers.


import { interval, Subject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new Subject())
)

const observer1 = connectableObservable.subscribe(log);
const observer2 = connectableObservable.subscribe(log);

const connectableSubscription = (connectableObservable as ConnectableObservable<any>)
  .connect();

multicastreturns an object ConnectableObservablethat has a method connect. Its purpose is to subscribe the received subject to the source stream.

The method connectallows us to determine when to start the execution of the original thread. There is a moment to keep in mind - to unsubscribe from the source you need to do:

connectableSubscription.unsubscribe();

We are not limited to simple Subject. Instead, you can use any derived class, for example ReplaySubject:

import { interval, ReplaySubject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new ReplaySubject(1))
)

const observer1 = connectableObservable.subscribe(log);

setTimeout(() => {
  // Late subscriber
  connectableObservable.subscribe(log);
}, 3000)

const connectable = (connectableObservable as ConnectableObservable<any>).connect();

From this code you can guess what will happen under the hood.

When we use multicast, we can transfer not only Subject, but also a factory function, which returns a new one each time Subject.

Reused already completed subjectcan not be, the factory function solves this problem.

interval(1000).pipe(
  multicast(() => new Subject())
)

Refount


When we use the operator multicast(), we are responsible for the call connect()to start execution of the original observable. Plus, we still have to monitor for possible memory leaks, manually unsubscribing from ConnectableSubscription.

Automation of the process would avoid errors and simplify the code. The kind RxJS developers thought about it for us and created an refCountoperator.

refCountcounts subscriptions and when the first appears, it calls connect(), i.e. subscribes. When it decreases back to zero, a response will be called.

const source = interval(1000).pipe(
  multicast(new Subject()),
  refCount()
)
 
// refCount === 1 => source.subscribe();
const observer1 = source.subscribe(log);

// refCount === 2
const observer2 = source.subscribe(log);

setTimeout(() => {
  // refCount - 1
  observer1.unsubscribe();
  // refCount - 1
  observer2.unsubscribe();
  // refCount === 0 => source.unsubcribe();
}, 3000)

Note that after refCountwe get the usual observable, not ConnectableObservable.

Publish and its options


multicast() + Subject + refCount()this is a fairly typical case in RxJS and the developers have reduced it to a single operator.

Let's see what options we have.

  • publish() equivalent multicast(() => new Subject())
    const connectableObservable = interval(1000).pipe(
      publish()
    )
    
    connectableObservable.connect();
    

  • publishBehavior() equivalent multicast(new BehaviorSubject())
    const connectableObservable = interval(1000).pipe(
      publishBehavior(100)
    )
    
    connectableObservable.connect();
    

  • publishReplay() equivalent multicast(() => new ReplaySubject(x))
    const connectableObservable = interval(1000).pipe(
      publishReplay(3)
    )
    
    connectableObservable.connect();
    

  • publishLast() equivalent multicast(new AsyncSubject())
    const connectableObservable = interval(1000).pipe(
      take(2),
      publishLast()
    )
    
    connectableObservable.connect();
    

  • share() equivalent multicast(() => new Subject()) + refCount()
    const source = interval(1000).pipe(
      share()
    )
    

  • shareReplay(bufferSize) This is a multicast operator that uses ReplaySubject. He has no inside multicast()and his result is observable, not ConnectableObservable. It can be used with refCountor without it. Here are both options:

    interval(1000).pipe(
      shareReplay({ refCount: true, bufferSize: 1 })
    )
    
    interval(1000).pipe(
      shareReplay(1)
    )
    

When shareReplaycalled with { refCount: false }is like calling shareReplay(x).

In this case, there will be no reference counting. This means that until the original stream is completed, it shareReplaywill be subscribed to it, regardless of whether it itself has the final subscribers or not. All new subscribers will receive the last x values.

shareReplay vs publishReplay + refCount


At first glance , it is shareReplay({ refCount: true, bufferSize: X })identical publishReplay(X) + refCount() , but this is not entirely true.

Let's see what are the similarities and what is the difference.

They have the same behavior refCount- subscribing and unsubscribing from the original stream based on the number of subscribers. They also react the same when the original stream is completed - all new subscribers receive X last values.

However, if the original stream is not yet finalized, in this case when we have publishReplay(X) + refCount()- all new subscribers receive X values ​​from the buffer, and then will be re-signed using the same one ReplaySubject.
But if we use the shareReplay({ refCount: true, bufferSize: 1 })last X values, they will not get it, since inside it creates a new one ReplaySubject and uses it to re-subscribe to the source.

Examples illustrating this:

const source = interval(1000).pipe(
  publishReplay(1),
  refCount()
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
 
  // This subscriber will get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);

const source = interval(1000).pipe(
  shareReplay({ refCount: true, bufferSize: 1 })
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
  
  // This subscriber will NOT get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);





Actual examples in Angular


Let's see how to use the studied multicast operators in combat conditions.

We use share


Suppose we have a component that needs data from the original stream. It could be an http request, a state, or whatever. And we also need data manipulation, such as filtering, sorting, etc.

@Component({
  template: `
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;

  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
  }
}

And now we need another component that shows only the first user. If we subscribe to the source stream as it is, then:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

And now we have two http requests, sorting or filtering operations will be performed twice.
We apply share:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
      share()
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

We already know that he creates a new Subjectone that subscribes to the source. When the source emits, subject passes this value to all its subscribers.

The problem is resolved, and when we subscribed to firstUser$- we subscribed to the internal subject, and not the original stream directly.

Using ShareReplay


ShareReplayapplies when you need to emit, cache and repeat the last X values. A typical example is a singleton service that performs an http request.


@Injectable({ providedIn: 'root' })
export class BlogService {
  posts$ = this.http.get('https://jsonplaceholder.typicode.com/posts')
              .pipe(shareReplay(1));

  constructor(private http: HttpClient) {}
}

It doesn’t matter how many components will request data now or in the future, there will be only one http request and the result will be saved in the internal buffer ReplaySubject.
There may still be a case where you need to cancel an incomplete request, because there are no subscribers, then you will need to apply refCount.

The full code can be found here .

All Articles