Wir untersuchen Multicast-RxJS-Operatoren

Hallo Habr! Ich präsentiere Ihnen die Übersetzung des Artikels "Grundlegendes zu RxJS-Multicast-Operatoren" von Netanel Basal.

Broadcast- oder Multicast-Betreiber scheinen oft das schwierigste Thema zu sein, um etwas über RxJS zu lernen. In diesem Artikel werde ich versuchen, alles auf zugängliche Weise zu erklären.

Wir werden die interne Struktur von Multicast-Operatoren und die von ihnen gelösten Aufgaben berücksichtigen.

Beginnen wir mit der Beschreibung der Grundbausteine ​​von RxJS.

Beobachtbar


In RxJS sind beobachtbare Objekte (im Folgenden als "Ströme" bezeichnet) anfangs kalt. Dies bedeutet, dass jedes Mal, wenn Sie einen Stream abonnieren, ein Rückruf des Abonnements durchgeführt wird.

Erstellen Sie zum besseren Verständnis die folgende Implementierung:

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }
}

Der Konstruktor Observableakzeptiert einen einzelnen Parameter - den Rückruf des Abonnements
subscriptionFn. Es wird jedes Mal aufgerufen, wenn wir den Stream abonnieren ( subscribe()).

Manchmal rufen sie auch einen Rückruf eines Abonnements auf producer, da es auch Werte für den Abonnenten "erzeugt" (Beobachterobjekt in unserem Code).

Die Methode subscribe()nimmt eine Eingabe entgegen observer. Es ist ein Objekt mit drei eigenen Methoden : next(), error(), complete(). In Live-RxJS können Sie anstelle eines Objekts drei Funktionen übergeben. Wenn

die Methode subscribe()aufgerufen wird, ruft sie die Abonnementfunktion auf und übergibt sie an die Eingabe observer.

Wir haben die Methode jetzt nicht erwähntunsubscribeEs sollte jedoch beachtet werden, dass jedes Abonnement eine Möglichkeit bietet, es zu zerstören. In den meisten Fällen gibt ein Abonnement eine Funktion (oder ein Objekt mit der entsprechenden Methode) zurück, bei der die Verbindung zwischen dem Stream und seinen Abonnenten zerstört wird.

Das ist alles ziemlich einfach. Kommen wir jetzt der Realität näher. Wickeln Sie beispielsweise eine native XHR-API in einen Stream ein


function http(url) {
  // This function will be called when we call http().subscribe()
  const subscriptionFn = observer => {
    log('Observable execution: http');
    const xhr = new XMLHttpRequest();
    xhr.addEventListener('load', () => {
      if (xhr.readyState === 4 && xhr.status === 200) {
        observer.next(JSON.parse(xhr.responseText));
        observer.complete();
      }
    });
    xhr.open('GET', url);
    xhr.send();
    
    return () => xhr.abort()
  }
  
  return new Observable(subscriptionFn);
}

Wir haben eine Funktion geschrieben http, die eine URL empfängt, eine http-Anforderung ausführt und einen Stream zurückgibt, der die empfangene http-Antwort ausgibt.

Was wird Ihrer Meinung nach passieren, wenn wir diesen Stream zweimal abonnieren?


// A small observer helper
const observer = tag => ({
  next(value) {
    console.log(`${tag}:`, value);
  }
});

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-1'));

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-2'));

Richtig, zwei http-Anfragen werden ausgeführt. Wenn wir uns die Implementierung der Observable-Klasse noch einmal ansehen, werden wir sehen, warum dies so ist. Jeder Teilnehmer ruft einen Abonnement-Rückruf auf, der seinerseits jedes Mal eine http-Anforderung ausführt.



Betreiber


Ein Operator ist eine Funktion, die einen Stream als Eingabe verwendet, eine beliebige Aktion ausführt und einen Stream zurückgibt.

Wir werden unseren ersten eigenen Operator schreiben.
function map(fn) {
  return source => {
    return new Observable(observer => {
      log('Observable execution: map');
      return source.subscribe({
        next(value) {
          observer.next(fn(value));
        }
      });
    });
  };
}

Die Funktion map()gibt einen Operator zurück, der den ursprünglichen Stream akzeptiert, und gibt einen Stream zurück, in dem alle übergebenen Werte durch die Funktion geleitet werden fn.

Jene. Darin befindet sich immer ein Abonnement für den Eingabestream.

Bevor wir diesen neuen Operator verwenden können, müssen wir ihn irgendwie an den Stream anhängen. Erweitern Sie unsere Klasse Observableumpipe()

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }

  pipe(...operators) {
    return operators.reduce((source, next) => next(source), this);
  }
}

Eine einfache Methode, nur eine Codezeile. pipeEs nimmt eine Reihe von Operatoren und ruft sie nacheinander auf, wobei jede Eingabe das Ergebnis der vorherigen übergibt.

Verwenden wir unseren Operator:

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(observer('subscriber'));

Beim Aufruf subscribewird ein Abonnement für den Ausgabestream ausgeführt map(), und im Gegenzug mapwird ein Abonnement für den ursprünglichen Stream ausgeführt.

Der http-Stream gibt den Wert aus, in den er fällt map. Dann wird die Funktion ausgeführt fn, der Stream von mapgibt den Wert an das endgültige Abonnement aus. Es funktioniert wie eine observable chainFadenkette.



Wenn wir eine Kette zweimal abonnieren, wird jedes Abonnement in der Kette zweimal aufgerufen.

const firstUser$ = http('https://jsonplaceholder.typicode.com/users')
    .pipe(map(res => res[0]));

firstUser$.subscribe(observer('subscriber-1'));
firstUser$.subscribe(observer('subscriber-2'));



Und wenn dieses Verhalten nicht zu uns passt? Wenn wir die Abonnementfunktion nur einmal aufrufen möchten, wie viele Abonnements hätten wir?

Was ist zum Beispiel, wenn wir eine http-Anfrage stellen und das Ergebnis für alle Abonnenten verwenden möchten? In diesem Fall brauchen Sie Subject.

Themen


SubjectEs ist sowohl ein Stream als auch ein Abonnent. Der Ablauf - weil er eine Methode hat subscribe(), der Abonnent - weil er die Abonnentenschnittstelle implementiert - Methoden next(), error(), complete().

Lassen Sie uns ihn schreiben.

class Subject extends Observable {
  constructor() {
    super();
    this.observers = [];
  }

  subscribe(observer) {
    this.observers.push(observer);
  }

  next(value) {
    this.observers.forEach(observer => observer.next(value));
  }

  error(error) {
    this.observers.forEach(observer => observer.error(error));
  }

  complete() {
    this.observers.forEach(observer => observer.complete());
  }
}

Subjectkann als Zwischenstufe zwischen dem kalten Strom und vielen Teilnehmern fungieren.

Ändern Sie unser Beispiel wie folgt:

const subject = new Subject();
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(subject);

Beim Aufruf subject.subscribe(someFn)wird nur eine einfache Operation ausgeführt - Hinzufügen einer subject.observersFunktion zum Array someFn.

Nun, da es Subjectsich auch als Abonnent verhält, können Sie den ursprünglichen Stream abonnieren, d. H. Wenn der ursprüngliche Thread einen Wert ausgibt, wird dieser aufgerufen subject.next(), wodurch dieser Wert an jeden der Abonnenten übertragen wird subject.

Jetzt haben wir den ursprünglichen Rückruf des Abonnements einmal ausgeführt und nur eine http-Anfrage wird ausgeführt.



Party-Nachzügler


Was passiert, wenn der ursprüngliche Stream bereits funktioniert hat, bevor wir uns angemeldet haben?

Dies kann im vorherigen Beispiel nicht angezeigt werden, da http asynchron ist. Selbst wenn Sie es unmittelbar danach abonnieren, wird der Wert nach dem Abonnement weiterhin angezeigt.

Lassen Sie uns schnell eine generierende Funktion erstellen of:


function of(...values) {
  return new Observable(observer => {
    log('Observable execution: of');
    values.forEach(value => observer.next(value));
  });
}

Ein mit Mitteln erzeugter Stream of()gibt nacheinander synchron Werte aus. Wir werden abonnieren, subjectnachdem es bereits abonniert wurde.

const subject = new Subject();
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

Unsere Abonnenten haben nichts erhalten. Warum? Unsere Implementierung unterstützt keine "späten" Abonnenten. Wenn der ursprüngliche Stream von of()Werte ausgibt, Abonnenten noch nicht registriert sind, werden diese Werte nirgendwo hingehen.

In tatsächlichen Beispielen zu Angular kann es durchaus sein, dass der Quelldatenstrom funktioniert hat, Ihre Komponente jedoch noch nicht auf der Seite vorhanden ist. Wenn die Komponente angezeigt wird, abonniert sie die Quelle, erhält jedoch nicht die bereits übergebenen Werte.

Eine Möglichkeit, das Problem zu lösen, ist diese ReplaySubject. Wir skizzieren die Version und sehen, wie es funktioniert.


class ReplaySubject extends Subject {
  constructor(bufferSize) {
    super();
    this.observers = [];
    this.bufferSize = bufferSize;
    this.buffer = [];
  }

  subscribe(observer) {
    this.buffer.forEach(val => observer.next(val));
    this.observers.push(observer);
  }

  next(value) {
    if (this.buffer.length === this.bufferSize) {
      this.buffer.shift();
    }

    this.buffer.push(value);
    this.observers.forEach(observer => observer.next(value));
  }
}

Das Konzept ist einfach. Wie der Name schon sagt, ReplaySubjecthandelt es sich um eine spezielle SubjectVersion, die alte Werte für alle neuen Abonnenten reproduzieren kann.

Jeder freigegebene Wert wird an alle aktuellen Abonnenten übertragen und für zukünftige gespeichert. Die Puffergröße wird bufferSizeim Konstruktor festgelegt.

Schreiben Sie das vorherige Beispiel mit neu ReplaySubject.

const subject = new ReplaySubject(3);
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

Das Ergebnis hat sich geändert.

Trotz des späten Abonnements haben wir sie alle gefangen.



Zusammenfassend ist der Zweck ReplaySubjectdie Verteilung von Werten an alle Abonnenten und das Zwischenspeichern für zukünftige "späte" Abonnenten.

Bevor Sie fortfahren, empfehlen wir Ihnen, Ihre eigene Implementierung zu schreiben BehaviorSubject. Den fertigen Code finden Sie am Ende des Artikels.

Nun kommen wir endlich zu Multicast-Operatoren. Ich hoffe, die obigen Beispiele helfen Ihnen, sie schneller zu verstehen.

Multicast-Operatoren


Multicast und Connect


Der Betreiber multicast() verwendet Subjectden Quellstrom an mehrere Teilnehmer zu erteilen.


import { interval, Subject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new Subject())
)

const observer1 = connectableObservable.subscribe(log);
const observer2 = connectableObservable.subscribe(log);

const connectableSubscription = (connectableObservable as ConnectableObservable<any>)
  .connect();

multicastGibt ein Objekt ConnectableObservablemit einer Methode zurück connect. Ihr Zweck ist es, den empfangenen Betreff für den Quelldatenstrom zu abonnieren.

Mit dieser Methode connectkönnen wir bestimmen, wann die Ausführung des ursprünglichen Threads gestartet werden soll. Es gibt einen Moment zu beachten - um sich von der Quelle abzumelden, die Sie tun müssen:

connectableSubscription.unsubscribe();

Wir sind nicht auf einfach beschränkt Subject. Stattdessen können Sie eine beliebige abgeleitete Klasse verwenden, zum Beispiel ReplaySubject:

import { interval, ReplaySubject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new ReplaySubject(1))
)

const observer1 = connectableObservable.subscribe(log);

setTimeout(() => {
  // Late subscriber
  connectableObservable.subscribe(log);
}, 3000)

const connectable = (connectableObservable as ConnectableObservable<any>).connect();

Anhand dieses Codes können Sie erraten, was unter der Haube passieren wird.

Wenn wir verwenden multicast, können wir nicht nur Subjecteine Factory-Funktion übertragen, die jedes Mal eine neue zurückgibt Subject.

Wiederverwendet bereits abgeschlossen subjectkann nicht sein, die Werksfunktion löst dieses Problem.

interval(1000).pipe(
  multicast(() => new Subject())
)

Nachzählen


Wenn wir den Operator verwenden multicast(), sind wir für den Aufruf verantwortlich, um die connect()Ausführung des ursprünglichen Observable zu starten. Außerdem müssen wir immer noch nach möglichen Speicherlecks suchen und uns manuell abmelden ConnectableSubscription.

Die Automatisierung des Prozesses würde Fehler vermeiden und den Code vereinfachen. Die freundlichen RxJS-Entwickler haben für uns darüber nachgedacht und einen refCountOperator erstellt.

refCountzählt Abonnements und wenn das erste erscheint, ruft es an connect(), d.h. abonniert. Wenn es auf Null zurückfällt, wird eine Antwort aufgerufen.

const source = interval(1000).pipe(
  multicast(new Subject()),
  refCount()
)
 
// refCount === 1 => source.subscribe();
const observer1 = source.subscribe(log);

// refCount === 2
const observer2 = source.subscribe(log);

setTimeout(() => {
  // refCount - 1
  observer1.unsubscribe();
  // refCount - 1
  observer2.unsubscribe();
  // refCount === 0 => source.unsubcribe();
}, 3000)

Beachten Sie, dass, nachdem refCountwir das übliche beobachtbare erhalten haben, nicht ConnectableObservable.

Veröffentlichen und seine Optionen


multicast() + Subject + refCount()Dies ist ein ziemlich typischer Fall in RxJS, und die Entwickler haben ihn auf einen einzigen Operator reduziert.

Mal sehen, welche Möglichkeiten wir haben.

  • publish() Äquivalent multicast(() => new Subject())
    const connectableObservable = interval(1000).pipe(
      publish()
    )
    
    connectableObservable.connect();
    

  • publishBehavior() Äquivalent multicast(new BehaviorSubject())
    const connectableObservable = interval(1000).pipe(
      publishBehavior(100)
    )
    
    connectableObservable.connect();
    

  • publishReplay() Äquivalent multicast(() => new ReplaySubject(x))
    const connectableObservable = interval(1000).pipe(
      publishReplay(3)
    )
    
    connectableObservable.connect();
    

  • publishLast() Äquivalent multicast(new AsyncSubject())
    const connectableObservable = interval(1000).pipe(
      take(2),
      publishLast()
    )
    
    connectableObservable.connect();
    

  • share() Äquivalent multicast(() => new Subject()) + refCount()
    const source = interval(1000).pipe(
      share()
    )
    

  • shareReplay(bufferSize) Dies ist ein Multicast-Operator, der verwendet ReplaySubject. Er hat kein Inneres multicast()und sein Ergebnis ist beobachtbar, nicht ConnectableObservable. Es kann mit refCountoder ohne verwendet werden. Hier sind beide Optionen:

    interval(1000).pipe(
      shareReplay({ refCount: true, bufferSize: 1 })
    )
    
    interval(1000).pipe(
      shareReplay(1)
    )
    

Wenn mit shareReplayangerufen wird, { refCount: false }ist wie anrufen shareReplay(x).

In diesem Fall erfolgt keine Referenzzählung. Dies bedeutet, dass der ursprüngliche Stream bis zur Fertigstellung shareReplayabonniert wird, unabhängig davon, ob er selbst die endgültigen Abonnenten hat oder nicht. Alle neuen Abonnenten erhalten die letzten x-Werte.

shareReplay vs PublishReplay + refCount


Auf den ersten Blick ist es shareReplay({ refCount: true, bufferSize: X })identisch publishReplay(X) + refCount() , aber das ist nicht ganz richtig.

Mal sehen, was sind die Ähnlichkeiten und was ist der Unterschied.

Sie haben das gleiche Verhalten refCount: Abonnieren und Abbestellen des ursprünglichen Streams basierend auf der Anzahl der Abonnenten. Sie reagieren auch gleich, wenn der ursprüngliche Stream abgeschlossen ist - alle neuen Abonnenten erhalten X letzte Werte.

Wenn der ursprüngliche Stream jedoch noch nicht finalisiert ist, erhalten in diesem Fall publishReplay(X) + refCount()alle neuen Abonnenten X-Werte aus dem Puffer und werden dann mit demselben neu signiert ReplaySubject.
Wenn wir jedoch die shareReplay({ refCount: true, bufferSize: 1 })letzten X-Werte verwenden, erhalten sie diese nicht, da darin ein neuer Wert erstellt ReplaySubjectund zum erneuten Abonnieren der Quelle verwendet wird.

Beispiele, die dies veranschaulichen:

const source = interval(1000).pipe(
  publishReplay(1),
  refCount()
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
 
  // This subscriber will get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);

const source = interval(1000).pipe(
  shareReplay({ refCount: true, bufferSize: 1 })
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
  
  // This subscriber will NOT get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);





Aktuelle Beispiele in Angular


Mal sehen, wie man die untersuchten Multicast-Operatoren unter Kampfbedingungen einsetzt.

Wir verwenden Share


Angenommen, wir haben eine Komponente, die Daten aus dem ursprünglichen Stream benötigt. Es kann sich um eine http-Anfrage, einen Status oder was auch immer handeln. Und wir brauchen auch Datenmanipulationen wie Filtern, Sortieren usw.

@Component({
  template: `
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;

  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
  }
}

Und jetzt brauchen wir eine andere Komponente, die nur den ersten Benutzer anzeigt. Wenn wir den Quellstrom so abonnieren, wie er ist, dann:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

Und jetzt haben wir zwei http-Anfragen. Sortier- oder Filtervorgänge werden zweimal ausgeführt.
Wir bewerben uns share:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
      share()
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

Wir wissen bereits, dass er eine neue erstellt Subject, die die Quelle abonniert. Wenn die Quelle sendet, übergibt der Betreff diesen Wert an alle seine Abonnenten.

Das Problem ist behoben, und wenn wir uns angemeldet haben, haben firstUser$wir den internen subjectund nicht den ursprünglichen Stream direkt abonniert .

Verwenden von ShareReplay


ShareReplaygilt, wenn Sie die letzten X-Werte ausgeben, zwischenspeichern und wiederholen müssen. Ein typisches Beispiel ist ein Singleton-Dienst, der eine http-Anforderung ausführt.


@Injectable({ providedIn: 'root' })
export class BlogService {
  posts$ = this.http.get('https://jsonplaceholder.typicode.com/posts')
              .pipe(shareReplay(1));

  constructor(private http: HttpClient) {}
}

Es spielt keine Rolle, wie viele Komponenten jetzt oder in Zukunft Daten anfordern, es gibt nur eine http-Anforderung und das Ergebnis wird im internen Puffer gespeichert ReplaySubject.
Es kann immer noch vorkommen, dass Sie eine unvollständige Anfrage stornieren müssen, da keine Abonnenten vorhanden sind. Dann müssen Sie sich bewerben refCount.

Den vollständigen Code finden Sie hier .

All Articles