Hello, Habr! I present to you the translation of the article "Understanding RxJS Multicast Operators" by Netanel Basal.Broadcast or multicast operators often seem like the hardest topic to learn about RxJS. In this article I will try to explain everything in an accessible way.We will consider the internal structure of multicast operators and the tasks they solve.Let's start by describing the basic building blocks of RxJS.Observable
In RxJS, observable objects (hereinafter referred to as “streams”) are initially cold. This means that every time you subscribe to a stream, a callback of the subscription is performed.For a better understanding, create the following implementation:class Observable {
constructor(subscriptionFn) {
this.subscriptionFn = subscriptionFn;
}
subscribe(observer) {
return this.subscriptionFn(observer);
}
}
The constructor Observable
accepts a single parameter - the callback of the subscriptionsubscriptionFn
. It will be called every time we subscribe to the stream ( subscribe()
).Sometimes they also call a callback of a subscription producer
, as it also “produces” values for the subscriber (observer object in our code).The method subscribe()
takes an input observer
. It is an object with three own methods: next(), error(), complete()
. In live RxJS, you can pass three functions instead of an object.The method, subscribe()
when called, calls the subscription function passing it to the input observer
.We did not mention the method nowunsubscribe
, but it should be borne in mind that each subscription provides a way to destroy it. Most often, a subscription returns a function (or an object with the appropriate method), during which the connection between the stream and its subscribers is destroyed.This is all pretty simple. Let's move closer to reality now. For example, wrap a native XHR API in a stream
function http(url) {
const subscriptionFn = observer => {
log('Observable execution: http');
const xhr = new XMLHttpRequest();
xhr.addEventListener('load', () => {
if (xhr.readyState === 4 && xhr.status === 200) {
observer.next(JSON.parse(xhr.responseText));
observer.complete();
}
});
xhr.open('GET', url);
xhr.send();
return () => xhr.abort()
}
return new Observable(subscriptionFn);
}
We wrote a function http
that receives a URL, executes an http request, and returns a stream that emits the received http response.Now, looking at our implementation, what do you think will happen when we subscribe to this stream twice?
const observer = tag => ({
next(value) {
console.log(`${tag}:`, value);
}
});
http('https://jsonplaceholder.typicode.com/users')
.subscribe(observer('subscriber-1'));
http('https://jsonplaceholder.typicode.com/users')
.subscribe(observer('subscriber-2'));
Correctly, two http requests will be executed. If we look again at the implementation of the Observable class, we will see why this is so. Each subscriber calls a subscription callback, which in turn performs an http request each time.
Operators
An operator is a function that takes a stream as an input, performs any action, and returns a stream.We will write our first own operator.function map(fn) {
return source => {
return new Observable(observer => {
log('Observable execution: map');
return source.subscribe({
next(value) {
observer.next(fn(value));
}
});
});
};
}
The function map()
returns an operator that accepts the original stream and returns a stream in which all passing values will be passed through the function fn
.Those. inside it there is always a subscription to the input stream.Before using this new operator, we need to somehow attach it to the stream. Extend our class Observable
bypipe()
class Observable {
constructor(subscriptionFn) {
this.subscriptionFn = subscriptionFn;
}
subscribe(observer) {
return this.subscriptionFn(observer);
}
pipe(...operators) {
return operators.reduce((source, next) => next(source), this);
}
}
A simple method, just one line of code. pipe
It takes an array of operators and calls them in turn, passing each input the result of the previous one.Let's use our operator:http('https://jsonplaceholder.typicode.com/users')
.pipe(map(res => res[0]))
.subscribe(observer('subscriber'));
When called subscribe
, a subscription to the output stream will be executed map()
, and in turn map
, a subscription to the original stream will be executed inside .The http stream emits the value it falls into map
. Then, the function is executed fn
, the stream from map
emits the value to the final subscription. It works like a observable chain
thread chain.
If we subscribe to a chain twice, each subscription in the chain will be called twice.const firstUser$ = http('https://jsonplaceholder.typicode.com/users')
.pipe(map(res => res[0]));
firstUser$.subscribe(observer('subscriber-1'));
firstUser$.subscribe(observer('subscriber-2'));
And if this behavior does not suit us? If we want to call the subscription function only once, how many subscriptions would we have?For example, what if we want to make one http request and use the result for all subscribers? In this case you need Subject
.Subjects
Subject
it is both a stream and a subscriber. The flow - because it has a method subscribe()
, the subscriber - because it implements the subscriber interface - methods next(), error(), complete().
Let's write it.class Subject extends Observable {
constructor() {
super();
this.observers = [];
}
subscribe(observer) {
this.observers.push(observer);
}
next(value) {
this.observers.forEach(observer => observer.next(value));
}
error(error) {
this.observers.forEach(observer => observer.error(error));
}
complete() {
this.observers.forEach(observer => observer.complete());
}
}
Subject
can act as an intermediate between the cold stream and many subscribers.Change our example as follows:const subject = new Subject();
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));
http('https://jsonplaceholder.typicode.com/users')
.pipe(map(res => res[0]))
.subscribe(subject);
When called subject.subscribe(someFn)
, only one simple operation is performed - adding a subject.observers
function to the array someFn
.Well, then, since it Subject
behaves as a subscriber too, you can subscribe it to the original stream, i.e. when the original thread issues a value, it is called subject.next()
, which entails the transfer of this value to each of the subscribers subject
.Now we have the original callback of the subscription is executed once, and only one http-request will be executed.
Party latecomers
What happens if the original stream already worked before we signed up?It will not be possible to show this in the previous example, since http is asynchronous, even if you subscribe to it immediately after, the value will still come after the subscription.Let's quickly create a generating function of
:
function of(...values) {
return new Observable(observer => {
log('Observable execution: of');
values.forEach(value => observer.next(value));
});
}
A stream created by means of()
emits values synchronously, one after the other. We will subscribe subject
after it has already subscribed to of.const subject = new Subject();
of(1, 2, 3).subscribe(subject);
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));
Our subscribers have not received anything. Why? Our implementation does not support “late” subscribers. When the original stream from of()
emits values, subscribers are not registered yet, these values will go nowhere.In actual examples on Angular, it may well be that the source stream worked, but your component is not yet present on the page. And when the component appears, it subscribes to the source, but does not receive the values that have already passed.One way to solve the problem is this ReplaySubject
. We outline its version and see how it works.
class ReplaySubject extends Subject {
constructor(bufferSize) {
super();
this.observers = [];
this.bufferSize = bufferSize;
this.buffer = [];
}
subscribe(observer) {
this.buffer.forEach(val => observer.next(val));
this.observers.push(observer);
}
next(value) {
if (this.buffer.length === this.bufferSize) {
this.buffer.shift();
}
this.buffer.push(value);
this.observers.forEach(observer => observer.next(value));
}
}
The concept is simple. As the name implies, ReplaySubject
this is a special Subject
one that can reproduce old values to all new subscribers.Each released value will be transferred to all current subscribers and saved for future ones, the buffer size is bufferSize
set in the constructor.Rewrite the previous example with ReplaySubject
.const subject = new ReplaySubject(3);
of(1, 2, 3).subscribe(subject);
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));
The result has changed.Despite the late subscription, we caught them all.
Summarizing, the purpose ReplaySubject
is the distribution of values to all subscribers and caching them for future "late" subscribers.Before moving on, I recommend that you try writing your own implementation BehaviorSubject
. You can find the finished code at the end of the article.Now we finally move on to multicast operators. I hope the above examples will help you understand them faster.Multicast operators
Multicast and Connect
The operator multicast()
uses Subject
to issue the source stream to several subscribers.
import { interval, Subject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';
const connectableObservable = interval(1000).pipe(
multicast(new Subject())
)
const observer1 = connectableObservable.subscribe(log);
const observer2 = connectableObservable.subscribe(log);
const connectableSubscription = (connectableObservable as ConnectableObservable<any>)
.connect();
multicast
returns an object ConnectableObservable
that has a method connect
. Its purpose is to subscribe the received subject to the source stream.The method connect
allows us to determine when to start the execution of the original thread. There is a moment to keep in mind - to unsubscribe from the source you need to do:connectableSubscription.unsubscribe();
We are not limited to simple Subject
. Instead, you can use any derived class, for example ReplaySubject
:import { interval, ReplaySubject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';
const connectableObservable = interval(1000).pipe(
multicast(new ReplaySubject(1))
)
const observer1 = connectableObservable.subscribe(log);
setTimeout(() => {
connectableObservable.subscribe(log);
}, 3000)
const connectable = (connectableObservable as ConnectableObservable<any>).connect();
From this code you can guess what will happen under the hood.When we use multicast
, we can transfer not only Subject
, but also a factory function, which returns a new one each time Subject
.Reused already completed subject
can not be, the factory function solves this problem.interval(1000).pipe(
multicast(() => new Subject())
)
Refount
When we use the operator multicast()
, we are responsible for the call connect()
to start execution of the original observable. Plus, we still have to monitor for possible memory leaks, manually unsubscribing from ConnectableSubscription
.Automation of the process would avoid errors and simplify the code. The kind RxJS developers thought about it for us and created an refCount
operator.refCount
counts subscriptions and when the first appears, it calls connect()
, i.e. subscribes. When it decreases back to zero, a response will be called.const source = interval(1000).pipe(
multicast(new Subject()),
refCount()
)
const observer1 = source.subscribe(log);
const observer2 = source.subscribe(log);
setTimeout(() => {
observer1.unsubscribe();
observer2.unsubscribe();
}, 3000)
Note that after refCount
we get the usual observable, not ConnectableObservable
.Publish and its options
multicast() + Subject + refCount()
this is a fairly typical case in RxJS and the developers have reduced it to a single operator.Let's see what options we have.publish()
equivalent multicast(() => new Subject())
const connectableObservable = interval(1000).pipe(
publish()
)
connectableObservable.connect();
publishBehavior()
equivalent multicast(new BehaviorSubject())
const connectableObservable = interval(1000).pipe(
publishBehavior(100)
)
connectableObservable.connect();
publishReplay()
equivalent multicast(() => new ReplaySubject(x))
const connectableObservable = interval(1000).pipe(
publishReplay(3)
)
connectableObservable.connect();
publishLast()
equivalent multicast(new AsyncSubject())
const connectableObservable = interval(1000).pipe(
take(2),
publishLast()
)
connectableObservable.connect();
share()
equivalent multicast(() => new Subject()) + refCount()
const source = interval(1000).pipe(
share()
)
shareReplay(bufferSize)
This is a multicast operator that uses ReplaySubject
. He has no inside multicast()
and his result is observable, not ConnectableObservable
. It can be used with refCount
or without it. Here are both options:
interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 })
)
interval(1000).pipe(
shareReplay(1)
)
When shareReplay
called with { refCount: false }
is like calling shareReplay(x)
.In this case, there will be no reference counting. This means that until the original stream is completed, it shareReplay
will be subscribed to it, regardless of whether it itself has the final subscribers or not. All new subscribers will receive the last x values.shareReplay vs publishReplay + refCount
At first glance , it is shareReplay({ refCount: true, bufferSize: X })
identical publishReplay(X) + refCount()
, but this is not entirely true.Let's see what are the similarities and what is the difference.They have the same behavior refCount
- subscribing and unsubscribing from the original stream based on the number of subscribers. They also react the same when the original stream is completed - all new subscribers receive X last values.However, if the original stream is not yet finalized, in this case when we have publishReplay(X) + refCount()
- all new subscribers receive X values from the buffer, and then will be re-signed using the same one ReplaySubject
.But if we use the shareReplay({ refCount: true, bufferSize: 1 })
last X values, they will not get it, since inside it creates a new one ReplaySubject
and uses it to re-subscribe to the source.Examples illustrating this:const source = interval(1000).pipe(
publishReplay(1),
refCount()
);
const one = source.subscribe(observer('subcriber-1'));
setTimeout(() => {
one.unsubscribe();
const two = source.subscribe(observer('subcriber-2'));
}, 3000);
const source = interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 })
);
const one = source.subscribe(observer('subcriber-1'));
setTimeout(() => {
one.unsubscribe();
const two = source.subscribe(observer('subcriber-2'));
}, 3000);


Actual examples in Angular
Let's see how to use the studied multicast operators in combat conditions.We use share
Suppose we have a component that needs data from the original stream. It could be an http request, a state, or whatever. And we also need data manipulation, such as filtering, sorting, etc.@Component({
template: `
<users-list [users]="allUsers$ | async"></users-list>
`,
})
export class UsersPageComponent {
allUsers$: Observable<User[]>;
constructor(private http: HttpClient) {
}
ngOnInit() {
this.allUsers$ = this.http.get('https://api/users').pipe(
map(users => filter/sort),
);
}
}
And now we need another component that shows only the first user. If we subscribe to the source stream as it is, then:@Component({
template: `
<user [user]="firstUser$ | async"></user>
<users-list [users]="allUsers$ | async"></users-list>
`,
})
export class UsersPageComponent {
allUsers$: Observable<User[]>;
firstUser$: Observable<User>;
constructor(private http: HttpClient) {
}
ngOnInit() {
this.allUsers$ = this.http.get('https://api/users').pipe(
map(users => filter/sort),
);
this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
}
}
And now we have two http requests, sorting or filtering operations will be performed twice.We apply share
:@Component({
template: `
<user [user]="firstUser$ | async"></user>
<users-list [users]="allUsers$ | async"></users-list>
`,
})
export class UsersPageComponent {
allUsers$: Observable<User[]>;
firstUser$: Observable<User>;
constructor(private http: HttpClient) {
}
ngOnInit() {
this.allUsers$ = this.http.get('https://api/users').pipe(
map(users => filter/sort),
share()
);
this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
}
}
We already know that he creates a new Subject
one that subscribes to the source. When the source emits, subject passes this value to all its subscribers.The problem is resolved, and when we subscribed to firstUser$
- we subscribed to the internal subject
, and not the original stream directly.Using ShareReplay
ShareReplay
applies when you need to emit, cache and repeat the last X values. A typical example is a singleton service that performs an http request.
@Injectable({ providedIn: 'root' })
export class BlogService {
posts$ = this.http.get('https://jsonplaceholder.typicode.com/posts')
.pipe(shareReplay(1));
constructor(private http: HttpClient) {}
}
It doesn’t matter how many components will request data now or in the future, there will be only one http request and the result will be saved in the internal buffer ReplaySubject
.There may still be a case where you need to cancel an incomplete request, because there are no subscribers, then you will need to apply refCount
.The full code can be found here .