Bonjour, Habr! Je vous présente la traduction de l'article "Comprendre les opérateurs de multidiffusion RxJS" par Netanel Basal.Les opérateurs de diffusion ou de multidiffusion semblent souvent être le sujet le plus difficile à apprendre sur RxJS. Dans cet article, je vais essayer de tout expliquer de manière accessible.Nous considérerons la structure interne des opérateurs de multidiffusion et les tâches qu'ils résolvent.Commençons par décrire les blocs de construction de base de RxJS.Observable
Dans RxJS, les objets observables (ci-après dénommés «flux») sont initialement froids. Cela signifie que chaque fois que vous vous abonnez à un flux, un rappel de l'abonnement est effectué.Pour une meilleure compréhension, créez l'implémentation suivante:class Observable {
constructor(subscriptionFn) {
this.subscriptionFn = subscriptionFn;
}
subscribe(observer) {
return this.subscriptionFn(observer);
}
}
Le constructeur Observable
accepte un seul paramètre - le rappel de l'abonnementsubscriptionFn
. Il sera appelé chaque fois que nous nous abonnerons au stream ( subscribe()
).Parfois, ils appellent un rappel d'un abonnement producer
, car cela «produit» également des valeurs pour l'abonné (l'objet observateur dans notre code).La méthode subscribe()
prend une entrée observer
. Il est un objet avec trois méthodes propres: next(), error(), complete()
. Dans RxJS en direct, vous pouvez passer trois fonctions au lieu d'un objet.La méthode, subscribe()
lorsqu'elle est appelée, appelle la fonction d'abonnement en la passant à l'entrée observer
.Nous n'avons pas mentionné la méthode maintenantunsubscribe
, mais il ne faut pas oublier que chaque abonnement permet de le détruire. Le plus souvent, un abonnement renvoie une fonction (ou un objet avec la méthode appropriée), au cours de laquelle la connexion entre le flux et ses abonnés est détruite.Tout cela est assez simple. Rapprochez-vous de la réalité maintenant. Par exemple, encapsuler une API XHR native dans un flux
function http(url) {
const subscriptionFn = observer => {
log('Observable execution: http');
const xhr = new XMLHttpRequest();
xhr.addEventListener('load', () => {
if (xhr.readyState === 4 && xhr.status === 200) {
observer.next(JSON.parse(xhr.responseText));
observer.complete();
}
});
xhr.open('GET', url);
xhr.send();
return () => xhr.abort()
}
return new Observable(subscriptionFn);
}
Nous avons écrit une fonction http
qui reçoit une URL, exécute une demande http et renvoie un flux qui émet la réponse http reçue.Maintenant, en regardant notre implémentation, que pensez-vous qu'il se passera lorsque nous nous abonnerons à ce flux deux fois?
const observer = tag => ({
next(value) {
console.log(`${tag}:`, value);
}
});
http('https://jsonplaceholder.typicode.com/users')
.subscribe(observer('subscriber-1'));
http('https://jsonplaceholder.typicode.com/users')
.subscribe(observer('subscriber-2'));
Correctement, deux requêtes http seront exécutées. Si nous regardons à nouveau l'implémentation de la classe Observable, nous verrons pourquoi il en est ainsi. Chaque abonné appelle un rappel d'abonnement, qui à son tour effectue une demande http à chaque fois.
Les opérateurs
Un opérateur est une fonction qui prend un flux en entrée, effectue n'importe quelle action et renvoie un flux.Nous écrirons notre premier opérateur.function map(fn) {
return source => {
return new Observable(observer => {
log('Observable execution: map');
return source.subscribe({
next(value) {
observer.next(fn(value));
}
});
});
};
}
La fonction map()
renvoie un opérateur qui accepte le flux d'origine et retourne un flux dans lequel toutes les valeurs de passage seront transmises via la fonction fn
.Ceux. à l'intérieur, il y a toujours un abonnement au flux d'entrée.Avant d'utiliser ce nouvel opérateur, nous devons l'attacher en quelque sorte au flux. Prolongez notre classe Observable
enpipe()
class Observable {
constructor(subscriptionFn) {
this.subscriptionFn = subscriptionFn;
}
subscribe(observer) {
return this.subscriptionFn(observer);
}
pipe(...operators) {
return operators.reduce((source, next) => next(source), this);
}
}
Une méthode simple, une seule ligne de code. pipe
Il prend un tableau d'opérateurs et les appelle à son tour, en passant à chaque entrée le résultat de la précédente.Utilisons notre opérateur:http('https://jsonplaceholder.typicode.com/users')
.pipe(map(res => res[0]))
.subscribe(observer('subscriber'));
Lorsqu'il est appelé subscribe
, un abonnement au flux de sortie sera exécuté map()
, et à son tour map
, un abonnement au flux d'origine sera exécuté à l' intérieur .Le flux http émet la valeur dans laquelle il tombe map
. Ensuite, la fonction est exécutée fn
, le flux de map
émet la valeur vers l'abonnement final. Cela fonctionne comme une observable chain
chaîne de fils.
Si nous souscrivons à une chaîne deux fois, chaque abonnement de la chaîne sera appelé deux fois.const firstUser$ = http('https://jsonplaceholder.typicode.com/users')
.pipe(map(res => res[0]));
firstUser$.subscribe(observer('subscriber-1'));
firstUser$.subscribe(observer('subscriber-2'));
Et si ce comportement ne nous convient pas? Si nous voulons appeler la fonction d'abonnement une seule fois, combien d'abonnements aurions-nous?Par exemple, que se passe-t-il si nous voulons faire une seule requête http et utiliser le résultat pour tous les abonnés? Dans ce cas, vous en avez besoin Subject
.Sujets
Subject
c'est à la fois un flux et un abonné. Le flux - parce qu'il a une méthode subscribe()
, l'abonné - parce qu'il implémente l'interface d'abonné - méthodes next(), error(), complete().
Écrivons-le.class Subject extends Observable {
constructor() {
super();
this.observers = [];
}
subscribe(observer) {
this.observers.push(observer);
}
next(value) {
this.observers.forEach(observer => observer.next(value));
}
error(error) {
this.observers.forEach(observer => observer.error(error));
}
complete() {
this.observers.forEach(observer => observer.complete());
}
}
Subject
peut servir d'intermédiaire entre le flux froid et de nombreux abonnés.Modifiez notre exemple comme suit:const subject = new Subject();
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));
http('https://jsonplaceholder.typicode.com/users')
.pipe(map(res => res[0]))
.subscribe(subject);
Lorsqu'elle est appelée subject.subscribe(someFn)
, une seule opération simple est effectuée - ajouter une subject.observers
fonction au tableau someFn
.Eh bien, comme il Subject
se comporte également comme un abonné, vous pouvez le souscrire au flux d'origine, c'est-à-dire lorsque le thread d'origine émet une valeur, elle est appelée subject.next()
, ce qui entraîne le transfert de cette valeur à chacun des abonnés subject
.Nous avons maintenant le rappel d'origine de l'abonnement exécuté une seule fois, et une seule requête http sera exécutée.
Les retardataires du parti
Que se passe-t-il si le flux d'origine a déjà fonctionné avant notre inscription?Il ne sera pas possible de le montrer dans l'exemple précédent, car http est asynchrone, même si vous vous y abonnez immédiatement après, la valeur viendra toujours après l'abonnement.Créons rapidement une fonction génératrice of
:
function of(...values) {
return new Observable(observer => {
log('Observable execution: of');
values.forEach(value => observer.next(value));
});
}
Un flux créé par moyen of()
émet des valeurs de manière synchrone, l'une après l'autre. Nous nous abonnerons subject
après qu'il a déjà souscrit à.const subject = new Subject();
of(1, 2, 3).subscribe(subject);
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));
Nos abonnés n'ont rien reçu. Pourquoi? Notre implémentation ne prend pas en charge les abonnés «tardifs». Lorsque le flux d'origine of()
émet des valeurs, les abonnés ne sont pas encore enregistrés, ces valeurs n'iront nulle part.Dans des exemples réels sur Angular, il se pourrait bien que le flux source ait fonctionné, mais votre composant n'est pas encore présent sur la page. Et lorsque le composant apparaît, il s'abonne à la source, mais ne reçoit pas les valeurs qui ont déjà été transmises.Une façon de résoudre le problème est la suivante ReplaySubject
. Nous décrivons sa version et voyons comment cela fonctionne.
class ReplaySubject extends Subject {
constructor(bufferSize) {
super();
this.observers = [];
this.bufferSize = bufferSize;
this.buffer = [];
}
subscribe(observer) {
this.buffer.forEach(val => observer.next(val));
this.observers.push(observer);
}
next(value) {
if (this.buffer.length === this.bufferSize) {
this.buffer.shift();
}
this.buffer.push(value);
this.observers.forEach(observer => observer.next(value));
}
}
Le concept est simple. Comme son nom l'indique, ReplaySubject
celui-ci est spécial Subject
et peut reproduire les anciennes valeurs à tous les nouveaux abonnés.Chaque valeur publiée sera transférée à tous les abonnés actuels et enregistrée pour les futurs, la taille de la mémoire tampon est bufferSize
définie dans le constructeur.Réécrivez l'exemple précédent avec ReplaySubject
.const subject = new ReplaySubject(3);
of(1, 2, 3).subscribe(subject);
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));
Le résultat a changé.Malgré l'abonnement tardif, nous les avons tous pris.
En résumé, le but ReplaySubject
est la distribution de valeurs à tous les abonnés et leur mise en cache pour les futurs abonnés "tardifs".Avant de continuer, je vous recommande d'essayer d'écrire votre propre implémentation BehaviorSubject
. Vous pouvez trouver le code fini à la fin de l'article.Maintenant, nous passons enfin aux opérateurs de multidiffusion. J'espère que les exemples ci-dessus vous aideront à les comprendre plus rapidement.Opérateurs de multidiffusion
Multidiffusion et connexion
L'opérateur multicast()
utilise Subject
pour envoyer le flux source à plusieurs abonnés.
import { interval, Subject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';
const connectableObservable = interval(1000).pipe(
multicast(new Subject())
)
const observer1 = connectableObservable.subscribe(log);
const observer2 = connectableObservable.subscribe(log);
const connectableSubscription = (connectableObservable as ConnectableObservable<any>)
.connect();
multicast
renvoie un objet ConnectableObservable
qui a une méthode connect
. Son but est de signer le sujet reçu dans le flux source.La méthode connect
nous permet de déterminer quand démarrer l'exécution du thread d'origine. Il y a un moment à garder à l'esprit - pour vous désabonner de la source, vous devez faire:connectableSubscription.unsubscribe();
Nous ne sommes pas limités au simple Subject
. Au lieu de cela, vous pouvez utiliser n'importe quelle classe dérivée, par exemple ReplaySubject
:import { interval, ReplaySubject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';
const connectableObservable = interval(1000).pipe(
multicast(new ReplaySubject(1))
)
const observer1 = connectableObservable.subscribe(log);
setTimeout(() => {
connectableObservable.subscribe(log);
}, 3000)
const connectable = (connectableObservable as ConnectableObservable<any>).connect();
À partir de ce code, vous pouvez deviner ce qui se passera sous le capot.Lorsque nous utilisons multicast
, nous pouvons transférer non seulement Subject
, mais aussi une fonction d'usine, qui renvoie une nouvelle à chaque fois Subject
.Réutilisé déjà terminé subject
ne peut pas l'être, la fonction d'usine résout ce problème.interval(1000).pipe(
multicast(() => new Subject())
)
Refount
Lorsque nous utilisons l'opérateur multicast()
, nous sommes responsables de l'appel connect()
pour commencer l'exécution de l'observable d'origine. De plus, nous devons toujours surveiller les éventuelles fuites de mémoire, en vous désinscrivant manuellement ConnectableSubscription
.L'automatisation du processus éviterait les erreurs et simplifierait le code. Les aimables développeurs RxJS y ont pensé et ont créé un refCount
opérateur.refCount
compte les abonnements et lorsque le premier apparaît, il appelle connect()
, c'est-à-dire s'abonne. Quand il revient à zéro, une réponse sera appelée.const source = interval(1000).pipe(
multicast(new Subject()),
refCount()
)
const observer1 = source.subscribe(log);
const observer2 = source.subscribe(log);
setTimeout(() => {
observer1.unsubscribe();
observer2.unsubscribe();
}, 3000)
Notez qu'après avoir refCount
obtenu l'observable habituel, non ConnectableObservable
.Publier et ses variantes
multicast() + Subject + refCount()
c'est un cas assez typique dans RxJS et les développeurs l'ont réduit à un seul opérateur.Voyons quelles options nous avons.publish()
équivalent multicast(() => new Subject())
const connectableObservable = interval(1000).pipe(
publish()
)
connectableObservable.connect();
publishBehavior()
équivalent multicast(new BehaviorSubject())
const connectableObservable = interval(1000).pipe(
publishBehavior(100)
)
connectableObservable.connect();
publishReplay()
équivalent multicast(() => new ReplaySubject(x))
const connectableObservable = interval(1000).pipe(
publishReplay(3)
)
connectableObservable.connect();
publishLast()
équivalent multicast(new AsyncSubject())
const connectableObservable = interval(1000).pipe(
take(2),
publishLast()
)
connectableObservable.connect();
share()
équivalent multicast(() => new Subject()) + refCount()
const source = interval(1000).pipe(
share()
)
shareReplay(bufferSize)
Il s'agit d'un opérateur de multidiffusion qui utilise ReplaySubject
. Il n'a pas à l'intérieur multicast()
et son résultat est observable, non ConnectableObservable
. Il peut être utilisé avec refCount
ou sans. Voici les deux options:
interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 })
)
interval(1000).pipe(
shareReplay(1)
)
Lorsqu'il est shareReplay
appelé avec, { refCount: false }
c'est comme appeler shareReplay(x)
.Dans ce cas, il n'y aura pas de comptage de références. Cela signifie que jusqu'à ce que le flux d'origine soit terminé, il shareReplay
y sera abonné, qu'il ait lui-même ou non les abonnés finaux. Tous les nouveaux abonnés recevront les dernières valeurs x.shareReplay vs publishReplay + refCount
À première vue , c'est shareReplay({ refCount: true, bufferSize: X })
identique publishReplay(X) + refCount()
, mais ce n'est pas tout à fait vrai.Voyons quelles sont les similitudes et quelle est la différence.Ils ont le même comportement refCount
- s'abonner et se désinscrire du flux d'origine en fonction du nombre d'abonnés. Ils réagissent également de la même manière lorsque le flux d'origine est terminé - tous les nouveaux abonnés reçoivent X dernières valeurs.Cependant, si le flux d'origine n'est pas encore finalisé, dans ce cas où nous l'avons publishReplay(X) + refCount()
- tous les nouveaux abonnés reçoivent des valeurs X du tampon, puis seront re-signés en utilisant le même ReplaySubject
.Mais si nous utilisons les shareReplay({ refCount: true, bufferSize: 1 })
dernières valeurs X, ils ne les obtiendront pas, car à l'intérieur, il en crée une nouvelle ReplaySubject
et l'utilise pour se réabonner à la source.Exemples illustrant cela:const source = interval(1000).pipe(
publishReplay(1),
refCount()
);
const one = source.subscribe(observer('subcriber-1'));
setTimeout(() => {
one.unsubscribe();
const two = source.subscribe(observer('subcriber-2'));
}, 3000);
const source = interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 })
);
const one = source.subscribe(observer('subcriber-1'));
setTimeout(() => {
one.unsubscribe();
const two = source.subscribe(observer('subcriber-2'));
}, 3000);


Exemples réels en angulaire
Voyons comment utiliser les opérateurs de multidiffusion étudiés dans des conditions de combat.Nous utilisons le partage
Supposons que nous ayons un composant qui a besoin de données du flux d'origine. Cela peut être une requête http, un état ou autre. Et nous avons également besoin de manipulation de données, comme le filtrage, le tri, etc.@Component({
template: `
<users-list [users]="allUsers$ | async"></users-list>
`,
})
export class UsersPageComponent {
allUsers$: Observable<User[]>;
constructor(private http: HttpClient) {
}
ngOnInit() {
this.allUsers$ = this.http.get('https://api/users').pipe(
map(users => filter/sort),
);
}
}
Et maintenant, nous avons besoin d'un autre composant qui ne montre que le premier utilisateur. Si nous nous abonnons au flux source tel quel, alors:@Component({
template: `
<user [user]="firstUser$ | async"></user>
<users-list [users]="allUsers$ | async"></users-list>
`,
})
export class UsersPageComponent {
allUsers$: Observable<User[]>;
firstUser$: Observable<User>;
constructor(private http: HttpClient) {
}
ngOnInit() {
this.allUsers$ = this.http.get('https://api/users').pipe(
map(users => filter/sort),
);
this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
}
}
Et maintenant nous avons deux requêtes http, les opérations de tri ou de filtrage seront effectuées deux fois.Nous appliquons share
:@Component({
template: `
<user [user]="firstUser$ | async"></user>
<users-list [users]="allUsers$ | async"></users-list>
`,
})
export class UsersPageComponent {
allUsers$: Observable<User[]>;
firstUser$: Observable<User>;
constructor(private http: HttpClient) {
}
ngOnInit() {
this.allUsers$ = this.http.get('https://api/users').pipe(
map(users => filter/sort),
share()
);
this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
}
}
Nous savons déjà qu'il en crée un nouveau Subject
qui s'abonne à la source. Lorsque la source émet, le sujet transmet cette valeur à tous ses abonnés.Le problème est résolu, et lorsque nous nous sommes abonnés à firstUser$
- nous nous sommes abonnés au subject
flux interne , et non directement au flux d'origine.Utilisation de ShareReplay
ShareReplay
s'applique lorsque vous devez émettre, mettre en cache et répéter les dernières valeurs X. Un exemple typique est un service singleton qui exécute une requête http.
@Injectable({ providedIn: 'root' })
export class BlogService {
posts$ = this.http.get('https://jsonplaceholder.typicode.com/posts')
.pipe(shareReplay(1));
constructor(private http: HttpClient) {}
}
Peu importe le nombre de composants qui demanderont des données maintenant ou à l'avenir, il n'y aura qu'une seule demande http et le résultat sera enregistré dans le tampon interne ReplaySubject
.Il peut toujours y avoir un cas où vous devez annuler une demande incomplète, car il n'y a pas d'abonnés, vous devrez alors postuler refCount
.Le code complet peut être trouvé ici .