Nous étudions les opérateurs RxJS multicast

Bonjour, Habr! Je vous présente la traduction de l'article "Comprendre les opérateurs de multidiffusion RxJS" par Netanel Basal.

Les opérateurs de diffusion ou de multidiffusion semblent souvent être le sujet le plus difficile à apprendre sur RxJS. Dans cet article, je vais essayer de tout expliquer de manière accessible.

Nous considérerons la structure interne des opérateurs de multidiffusion et les tâches qu'ils résolvent.

Commençons par décrire les blocs de construction de base de RxJS.

Observable


Dans RxJS, les objets observables (ci-après dénommés «flux») sont initialement froids. Cela signifie que chaque fois que vous vous abonnez à un flux, un rappel de l'abonnement est effectué.

Pour une meilleure compréhension, créez l'implémentation suivante:

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }
}

Le constructeur Observableaccepte un seul paramètre - le rappel de l'abonnement
subscriptionFn. Il sera appelé chaque fois que nous nous abonnerons au stream ( subscribe()).

Parfois, ils appellent un rappel d'un abonnement producer, car cela «produit» également des valeurs pour l'abonné (l'objet observateur dans notre code).

La méthode subscribe()prend une entrée observer. Il est un objet avec trois méthodes propres: next(), error(), complete(). Dans RxJS en direct, vous pouvez passer trois fonctions au lieu d'un objet.

La méthode, subscribe()lorsqu'elle est appelée, appelle la fonction d'abonnement en la passant à l'entrée observer.

Nous n'avons pas mentionné la méthode maintenantunsubscribe, mais il ne faut pas oublier que chaque abonnement permet de le détruire. Le plus souvent, un abonnement renvoie une fonction (ou un objet avec la méthode appropriée), au cours de laquelle la connexion entre le flux et ses abonnés est détruite.

Tout cela est assez simple. Rapprochez-vous de la réalité maintenant. Par exemple, encapsuler une API XHR native dans un flux


function http(url) {
  // This function will be called when we call http().subscribe()
  const subscriptionFn = observer => {
    log('Observable execution: http');
    const xhr = new XMLHttpRequest();
    xhr.addEventListener('load', () => {
      if (xhr.readyState === 4 && xhr.status === 200) {
        observer.next(JSON.parse(xhr.responseText));
        observer.complete();
      }
    });
    xhr.open('GET', url);
    xhr.send();
    
    return () => xhr.abort()
  }
  
  return new Observable(subscriptionFn);
}

Nous avons écrit une fonction httpqui reçoit une URL, exécute une demande http et renvoie un flux qui émet la réponse http reçue.

Maintenant, en regardant notre implémentation, que pensez-vous qu'il se passera lorsque nous nous abonnerons à ce flux deux fois?


// A small observer helper
const observer = tag => ({
  next(value) {
    console.log(`${tag}:`, value);
  }
});

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-1'));

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-2'));

Correctement, deux requêtes http seront exécutées. Si nous regardons à nouveau l'implémentation de la classe Observable, nous verrons pourquoi il en est ainsi. Chaque abonné appelle un rappel d'abonnement, qui à son tour effectue une demande http à chaque fois.



Les opérateurs


Un opérateur est une fonction qui prend un flux en entrée, effectue n'importe quelle action et renvoie un flux.

Nous écrirons notre premier opérateur.
function map(fn) {
  return source => {
    return new Observable(observer => {
      log('Observable execution: map');
      return source.subscribe({
        next(value) {
          observer.next(fn(value));
        }
      });
    });
  };
}

La fonction map()renvoie un opérateur qui accepte le flux d'origine et retourne un flux dans lequel toutes les valeurs de passage seront transmises via la fonction fn.

Ceux. à l'intérieur, il y a toujours un abonnement au flux d'entrée.

Avant d'utiliser ce nouvel opérateur, nous devons l'attacher en quelque sorte au flux. Prolongez notre classe Observableenpipe()

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }

  pipe(...operators) {
    return operators.reduce((source, next) => next(source), this);
  }
}

Une méthode simple, une seule ligne de code. pipeIl prend un tableau d'opérateurs et les appelle à son tour, en passant à chaque entrée le résultat de la précédente.

Utilisons notre opérateur:

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(observer('subscriber'));

Lorsqu'il est appelé subscribe, un abonnement au flux de sortie sera exécuté map(), et à son tour map, un abonnement au flux d'origine sera exécuté à l' intérieur .

Le flux http émet la valeur dans laquelle il tombe map. Ensuite, la fonction est exécutée fn, le flux de mapémet la valeur vers l'abonnement final. Cela fonctionne comme une observable chainchaîne de fils.



Si nous souscrivons à une chaîne deux fois, chaque abonnement de la chaîne sera appelé deux fois.

const firstUser$ = http('https://jsonplaceholder.typicode.com/users')
    .pipe(map(res => res[0]));

firstUser$.subscribe(observer('subscriber-1'));
firstUser$.subscribe(observer('subscriber-2'));



Et si ce comportement ne nous convient pas? Si nous voulons appeler la fonction d'abonnement une seule fois, combien d'abonnements aurions-nous?

Par exemple, que se passe-t-il si nous voulons faire une seule requête http et utiliser le résultat pour tous les abonnés? Dans ce cas, vous en avez besoin Subject.

Sujets


Subjectc'est à la fois un flux et un abonné. Le flux - parce qu'il a une méthode subscribe(), l'abonné - parce qu'il implémente l'interface d'abonné - méthodes next(), error(), complete().

Écrivons-le.

class Subject extends Observable {
  constructor() {
    super();
    this.observers = [];
  }

  subscribe(observer) {
    this.observers.push(observer);
  }

  next(value) {
    this.observers.forEach(observer => observer.next(value));
  }

  error(error) {
    this.observers.forEach(observer => observer.error(error));
  }

  complete() {
    this.observers.forEach(observer => observer.complete());
  }
}

Subjectpeut servir d'intermédiaire entre le flux froid et de nombreux abonnés.

Modifiez notre exemple comme suit:

const subject = new Subject();
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(subject);

Lorsqu'elle est appelée subject.subscribe(someFn), une seule opération simple est effectuée - ajouter une subject.observersfonction au tableau someFn.

Eh bien, comme il Subjectse comporte également comme un abonné, vous pouvez le souscrire au flux d'origine, c'est-à-dire lorsque le thread d'origine émet une valeur, elle est appelée subject.next(), ce qui entraîne le transfert de cette valeur à chacun des abonnés subject.

Nous avons maintenant le rappel d'origine de l'abonnement exécuté une seule fois, et une seule requête http sera exécutée.



Les retardataires du parti


Que se passe-t-il si le flux d'origine a déjà fonctionné avant notre inscription?

Il ne sera pas possible de le montrer dans l'exemple précédent, car http est asynchrone, même si vous vous y abonnez immédiatement après, la valeur viendra toujours après l'abonnement.

Créons rapidement une fonction génératrice of:


function of(...values) {
  return new Observable(observer => {
    log('Observable execution: of');
    values.forEach(value => observer.next(value));
  });
}

Un flux créé par moyen of()émet des valeurs de manière synchrone, l'une après l'autre. Nous nous abonnerons subjectaprès qu'il a déjà souscrit à.

const subject = new Subject();
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

Nos abonnés n'ont rien reçu. Pourquoi? Notre implémentation ne prend pas en charge les abonnés «tardifs». Lorsque le flux d'origine of()émet des valeurs, les abonnés ne sont pas encore enregistrés, ces valeurs n'iront nulle part.

Dans des exemples réels sur Angular, il se pourrait bien que le flux source ait fonctionné, mais votre composant n'est pas encore présent sur la page. Et lorsque le composant apparaît, il s'abonne à la source, mais ne reçoit pas les valeurs qui ont déjà été transmises.

Une façon de résoudre le problème est la suivante ReplaySubject. Nous décrivons sa version et voyons comment cela fonctionne.


class ReplaySubject extends Subject {
  constructor(bufferSize) {
    super();
    this.observers = [];
    this.bufferSize = bufferSize;
    this.buffer = [];
  }

  subscribe(observer) {
    this.buffer.forEach(val => observer.next(val));
    this.observers.push(observer);
  }

  next(value) {
    if (this.buffer.length === this.bufferSize) {
      this.buffer.shift();
    }

    this.buffer.push(value);
    this.observers.forEach(observer => observer.next(value));
  }
}

Le concept est simple. Comme son nom l'indique, ReplaySubjectcelui-ci est spécial Subjectet peut reproduire les anciennes valeurs à tous les nouveaux abonnés.

Chaque valeur publiée sera transférée à tous les abonnés actuels et enregistrée pour les futurs, la taille de la mémoire tampon est bufferSizedéfinie dans le constructeur.

Réécrivez l'exemple précédent avec ReplaySubject.

const subject = new ReplaySubject(3);
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

Le résultat a changé.

Malgré l'abonnement tardif, nous les avons tous pris.



En résumé, le but ReplaySubjectest la distribution de valeurs à tous les abonnés et leur mise en cache pour les futurs abonnés "tardifs".

Avant de continuer, je vous recommande d'essayer d'écrire votre propre implémentation BehaviorSubject. Vous pouvez trouver le code fini à la fin de l'article.

Maintenant, nous passons enfin aux opérateurs de multidiffusion. J'espère que les exemples ci-dessus vous aideront à les comprendre plus rapidement.

Opérateurs de multidiffusion


Multidiffusion et connexion


L'opérateur multicast() utilise Subjectpour envoyer le flux source à plusieurs abonnés.


import { interval, Subject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new Subject())
)

const observer1 = connectableObservable.subscribe(log);
const observer2 = connectableObservable.subscribe(log);

const connectableSubscription = (connectableObservable as ConnectableObservable<any>)
  .connect();

multicastrenvoie un objet ConnectableObservablequi a une méthode connect. Son but est de signer le sujet reçu dans le flux source.

La méthode connectnous permet de déterminer quand démarrer l'exécution du thread d'origine. Il y a un moment à garder à l'esprit - pour vous désabonner de la source, vous devez faire:

connectableSubscription.unsubscribe();

Nous ne sommes pas limités au simple Subject. Au lieu de cela, vous pouvez utiliser n'importe quelle classe dérivée, par exemple ReplaySubject:

import { interval, ReplaySubject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new ReplaySubject(1))
)

const observer1 = connectableObservable.subscribe(log);

setTimeout(() => {
  // Late subscriber
  connectableObservable.subscribe(log);
}, 3000)

const connectable = (connectableObservable as ConnectableObservable<any>).connect();

À partir de ce code, vous pouvez deviner ce qui se passera sous le capot.

Lorsque nous utilisons multicast, nous pouvons transférer non seulement Subject, mais aussi une fonction d'usine, qui renvoie une nouvelle à chaque fois Subject.

Réutilisé déjà terminé subjectne peut pas l'être, la fonction d'usine résout ce problème.

interval(1000).pipe(
  multicast(() => new Subject())
)

Refount


Lorsque nous utilisons l'opérateur multicast(), nous sommes responsables de l'appel connect()pour commencer l'exécution de l'observable d'origine. De plus, nous devons toujours surveiller les éventuelles fuites de mémoire, en vous désinscrivant manuellement ConnectableSubscription.

L'automatisation du processus éviterait les erreurs et simplifierait le code. Les aimables développeurs RxJS y ont pensé et ont créé un refCountopérateur.

refCountcompte les abonnements et lorsque le premier apparaît, il appelle connect(), c'est-à-dire s'abonne. Quand il revient à zéro, une réponse sera appelée.

const source = interval(1000).pipe(
  multicast(new Subject()),
  refCount()
)
 
// refCount === 1 => source.subscribe();
const observer1 = source.subscribe(log);

// refCount === 2
const observer2 = source.subscribe(log);

setTimeout(() => {
  // refCount - 1
  observer1.unsubscribe();
  // refCount - 1
  observer2.unsubscribe();
  // refCount === 0 => source.unsubcribe();
}, 3000)

Notez qu'après avoir refCountobtenu l'observable habituel, non ConnectableObservable.

Publier et ses variantes


multicast() + Subject + refCount()c'est un cas assez typique dans RxJS et les développeurs l'ont réduit à un seul opérateur.

Voyons quelles options nous avons.

  • publish() équivalent multicast(() => new Subject())
    const connectableObservable = interval(1000).pipe(
      publish()
    )
    
    connectableObservable.connect();
    

  • publishBehavior() équivalent multicast(new BehaviorSubject())
    const connectableObservable = interval(1000).pipe(
      publishBehavior(100)
    )
    
    connectableObservable.connect();
    

  • publishReplay() équivalent multicast(() => new ReplaySubject(x))
    const connectableObservable = interval(1000).pipe(
      publishReplay(3)
    )
    
    connectableObservable.connect();
    

  • publishLast() équivalent multicast(new AsyncSubject())
    const connectableObservable = interval(1000).pipe(
      take(2),
      publishLast()
    )
    
    connectableObservable.connect();
    

  • share() équivalent multicast(() => new Subject()) + refCount()
    const source = interval(1000).pipe(
      share()
    )
    

  • shareReplay(bufferSize) Il s'agit d'un opérateur de multidiffusion qui utilise ReplaySubject. Il n'a pas à l'intérieur multicast()et son résultat est observable, non ConnectableObservable. Il peut être utilisé avec refCountou sans. Voici les deux options:

    interval(1000).pipe(
      shareReplay({ refCount: true, bufferSize: 1 })
    )
    
    interval(1000).pipe(
      shareReplay(1)
    )
    

Lorsqu'il est shareReplayappelé avec, { refCount: false }c'est comme appeler shareReplay(x).

Dans ce cas, il n'y aura pas de comptage de références. Cela signifie que jusqu'à ce que le flux d'origine soit terminé, il shareReplayy sera abonné, qu'il ait lui-même ou non les abonnés finaux. Tous les nouveaux abonnés recevront les dernières valeurs x.

shareReplay vs publishReplay + refCount


À première vue , c'est shareReplay({ refCount: true, bufferSize: X })identique publishReplay(X) + refCount() , mais ce n'est pas tout à fait vrai.

Voyons quelles sont les similitudes et quelle est la différence.

Ils ont le même comportement refCount- s'abonner et se désinscrire du flux d'origine en fonction du nombre d'abonnés. Ils réagissent également de la même manière lorsque le flux d'origine est terminé - tous les nouveaux abonnés reçoivent X dernières valeurs.

Cependant, si le flux d'origine n'est pas encore finalisé, dans ce cas où nous l'avons publishReplay(X) + refCount()- tous les nouveaux abonnés reçoivent des valeurs X du tampon, puis seront re-signés en utilisant le même ReplaySubject.
Mais si nous utilisons les shareReplay({ refCount: true, bufferSize: 1 })dernières valeurs X, ils ne les obtiendront pas, car à l'intérieur, il en crée une nouvelle ReplaySubject et l'utilise pour se réabonner à la source.

Exemples illustrant cela:

const source = interval(1000).pipe(
  publishReplay(1),
  refCount()
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
 
  // This subscriber will get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);

const source = interval(1000).pipe(
  shareReplay({ refCount: true, bufferSize: 1 })
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
  
  // This subscriber will NOT get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);





Exemples réels en angulaire


Voyons comment utiliser les opérateurs de multidiffusion étudiés dans des conditions de combat.

Nous utilisons le partage


Supposons que nous ayons un composant qui a besoin de données du flux d'origine. Cela peut être une requête http, un état ou autre. Et nous avons également besoin de manipulation de données, comme le filtrage, le tri, etc.

@Component({
  template: `
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;

  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
  }
}

Et maintenant, nous avons besoin d'un autre composant qui ne montre que le premier utilisateur. Si nous nous abonnons au flux source tel quel, alors:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

Et maintenant nous avons deux requêtes http, les opérations de tri ou de filtrage seront effectuées deux fois.
Nous appliquons share:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
      share()
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

Nous savons déjà qu'il en crée un nouveau Subjectqui s'abonne à la source. Lorsque la source émet, le sujet transmet cette valeur à tous ses abonnés.

Le problème est résolu, et lorsque nous nous sommes abonnés à firstUser$- nous nous sommes abonnés au subjectflux interne , et non directement au flux d'origine.

Utilisation de ShareReplay


ShareReplays'applique lorsque vous devez émettre, mettre en cache et répéter les dernières valeurs X. Un exemple typique est un service singleton qui exécute une requête http.


@Injectable({ providedIn: 'root' })
export class BlogService {
  posts$ = this.http.get('https://jsonplaceholder.typicode.com/posts')
              .pipe(shareReplay(1));

  constructor(private http: HttpClient) {}
}

Peu importe le nombre de composants qui demanderont des données maintenant ou à l'avenir, il n'y aura qu'une seule demande http et le résultat sera enregistré dans le tampon interne ReplaySubject.
Il peut toujours y avoir un cas où vous devez annuler une demande incomplète, car il n'y a pas d'abonnés, vous devrez alors postuler refCount.

Le code complet peut être trouvé ici .

All Articles