Estudiamos operadores RxJS multicast

Hola Habr! Les presento la traducci贸n del art铆culo "Comprender los operadores de multidifusi贸n RxJS" de Netanel Basal.

Los operadores de transmisi贸n o multidifusi贸n a menudo parecen ser el tema m谩s dif铆cil de aprender sobre RxJS. En este art铆culo intentar茅 explicar todo de manera accesible.

Consideraremos la estructura interna de los operadores de multidifusi贸n y las tareas que resuelven.

Comencemos describiendo los componentes b谩sicos de RxJS.

Observable


En RxJS, los objetos observables (en adelante denominados "flujos") est谩n inicialmente fr铆os. Esto significa que cada vez que se suscribe a una transmisi贸n, se realiza una devoluci贸n de llamada de la suscripci贸n.

Para una mejor comprensi贸n, cree la siguiente implementaci贸n:

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }
}

El constructor Observableacepta un solo par谩metro: la devoluci贸n de llamada de la suscripci贸n
subscriptionFn. Se llamar谩 cada vez que nos suscribamos a stream ( subscribe()).

A veces tambi茅n llaman a una devoluci贸n de llamada de una suscripci贸n producer, ya que tambi茅n "produce" valores para el suscriptor (objeto observador en nuestro c贸digo).

El m茅todo subscribe()toma una entrada observer. Es un objeto con tres propios m茅todos: next(), error(), complete(). En Live RxJS, puede pasar tres funciones en lugar de un objeto.

El m茅todo, subscribe()cuando se llama, llama a la funci贸n de suscripci贸n y lo pasa a la entrada observer.

No mencionamos el m茅todo ahoraunsubscribe, pero debe tenerse en cuenta que cada suscripci贸n proporciona una forma de destruirla. Muy a menudo, una suscripci贸n devuelve una funci贸n (o un objeto con un m茅todo apropiado), durante el cual se destruye la conexi贸n entre la secuencia y sus suscriptores.

Todo esto es bastante simple. Acerqu茅monos m谩s a la realidad ahora. Por ejemplo, envuelva una API XHR nativa en una secuencia


function http(url) {
  // This function will be called when we call http().subscribe()
  const subscriptionFn = observer => {
    log('Observable execution: http');
    const xhr = new XMLHttpRequest();
    xhr.addEventListener('load', () => {
      if (xhr.readyState === 4 && xhr.status === 200) {
        observer.next(JSON.parse(xhr.responseText));
        observer.complete();
      }
    });
    xhr.open('GET', url);
    xhr.send();
    
    return () => xhr.abort()
  }
  
  return new Observable(subscriptionFn);
}

Escribimos una funci贸n httpque recibe una URL, ejecuta una solicitud http y devuelve una secuencia que emite la respuesta http recibida.

Ahora, mirando nuestra implementaci贸n, 驴qu茅 crees que suceder谩 cuando nos suscribamos a esta transmisi贸n dos veces?


// A small observer helper
const observer = tag => ({
  next(value) {
    console.log(`${tag}:`, value);
  }
});

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-1'));

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-2'));

Correctamente, se ejecutar谩n dos solicitudes http. Si observamos nuevamente la implementaci贸n de la clase Observable, veremos por qu茅 es as铆. Cada suscriptor llama a una devoluci贸n de llamada de suscripci贸n, que a su vez realiza una solicitud http cada vez.



Operadores


Un operador es una funci贸n que toma una secuencia como entrada, realiza cualquier acci贸n y devuelve una secuencia.

Escribiremos nuestro primer operador propio.
function map(fn) {
  return source => {
    return new Observable(observer => {
      log('Observable execution: map');
      return source.subscribe({
        next(value) {
          observer.next(fn(value));
        }
      });
    });
  };
}

La funci贸n map()devuelve un operador que acepta la secuencia original y devuelve una secuencia en la que todos los valores de paso se pasar谩n a trav茅s de la funci贸n fn.

Aquellos. dentro de 茅l siempre hay una suscripci贸n a la secuencia de entrada.

Antes de usar este nuevo operador, necesitamos adjuntarlo de alguna manera a la transmisi贸n. Extender nuestra clase Observableporpipe()

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }

  pipe(...operators) {
    return operators.reduce((source, next) => next(source), this);
  }
}

Un m茅todo simple, solo una l铆nea de c贸digo. pipeToma una serie de operadores y los llama a su vez, pasando a cada entrada el resultado del anterior.

Usemos nuestro operador:

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(observer('subscriber'));

Cuando se llama subscribe, se ejecutar谩 una suscripci贸n a la secuencia de salida map()y, a su vez map, se ejecutar谩 una suscripci贸n a la secuencia original en su interior .

La secuencia http emite el valor en el que cae map. Luego, se ejecuta la funci贸n fn, la secuencia de mapemite el valor a la suscripci贸n final. Funciona como una observable chaincadena de hilos.



Si nos suscribimos a una cadena dos veces, cada suscripci贸n en la cadena se llamar谩 dos veces.

const firstUser$ = http('https://jsonplaceholder.typicode.com/users')
    .pipe(map(res => res[0]));

firstUser$.subscribe(observer('subscriber-1'));
firstUser$.subscribe(observer('subscriber-2'));



驴Y si este comportamiento no nos conviene? Si queremos llamar a la funci贸n de suscripci贸n solo una vez, 驴cu谩ntas suscripciones tendr铆amos?

Por ejemplo, 驴qu茅 pasa si queremos hacer una solicitud http y usar el resultado para todos los suscriptores? En este caso lo necesitas Subject.

Asignaturas


SubjectEs tanto una transmisi贸n como un suscriptor. El flujo - porque tiene un m茅todo subscribe(), el suscriptor - porque implementa la interfaz del suscriptor - m茅todos next(), error(), complete().

Vamos a escribirlo.

class Subject extends Observable {
  constructor() {
    super();
    this.observers = [];
  }

  subscribe(observer) {
    this.observers.push(observer);
  }

  next(value) {
    this.observers.forEach(observer => observer.next(value));
  }

  error(error) {
    this.observers.forEach(observer => observer.error(error));
  }

  complete() {
    this.observers.forEach(observer => observer.complete());
  }
}

Subjectpuede actuar como intermediario entre la transmisi贸n en fr铆o y muchos suscriptores.

Cambie nuestro ejemplo de la siguiente manera:

const subject = new Subject();
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(subject);

Cuando se llama subject.subscribe(someFn), solo se realiza una operaci贸n simple: agregar una subject.observersfunci贸n a la matriz someFn.

Bueno, entonces, dado que tambi茅n se Subjectcomporta como suscriptor, puede suscribirse a la transmisi贸n original, es decir cuando el hilo original emite un valor, se llama subject.next(), lo que implica la transferencia de este valor a cada uno de los suscriptores subject.

Ahora tenemos que la devoluci贸n de llamada original de la suscripci贸n se ejecuta una vez, y solo se ejecutar谩 una solicitud http.



Llegados tarde a la fiesta


驴Qu茅 sucede si la transmisi贸n original ya funcion贸 antes de registrarnos?

No ser谩 posible mostrar esto en el ejemplo anterior, ya que http es as铆ncrono, incluso si se suscribe inmediatamente despu茅s, el valor seguir谩 apareciendo despu茅s de la suscripci贸n.

Creemos r谩pidamente una funci贸n generadora of:


function of(...values) {
  return new Observable(observer => {
    log('Observable execution: of');
    values.forEach(value => observer.next(value));
  });
}

Una secuencia creada por medios of()emite valores sincr贸nicamente, uno despu茅s del otro. Nos suscribiremos subjectdespu茅s de que ya se haya suscrito a.

const subject = new Subject();
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

Nuestros suscriptores no han recibido nada. 驴Por qu茅? Nuestra implementaci贸n no admite suscriptores "tard铆os". Cuando la secuencia original de of()emite valores, los suscriptores a煤n no est谩n registrados, estos valores no ir谩n a ninguna parte.

En ejemplos reales en Angular, puede ser que la secuencia de origen funcion贸, pero su componente a煤n no est谩 presente en la p谩gina. Y cuando aparece el componente, se suscribe a la fuente, pero no recibe los valores que ya han pasado.

Una forma de resolver el problema es esta ReplaySubject. Esbozamos su versi贸n y vemos c贸mo funciona.


class ReplaySubject extends Subject {
  constructor(bufferSize) {
    super();
    this.observers = [];
    this.bufferSize = bufferSize;
    this.buffer = [];
  }

  subscribe(observer) {
    this.buffer.forEach(val => observer.next(val));
    this.observers.push(observer);
  }

  next(value) {
    if (this.buffer.length === this.bufferSize) {
      this.buffer.shift();
    }

    this.buffer.push(value);
    this.observers.forEach(observer => observer.next(value));
  }
}

El concepto es simple. Como su nombre lo indica, ReplaySubjecteste es Subjectuno especial que puede reproducir valores antiguos a todos los suscriptores nuevos.

Cada valor liberado se transferir谩 a todos los suscriptores actuales y se guardar谩 para los futuros, el tama帽o del b煤fer se bufferSizeestablece en el constructor.

Reescribe el ejemplo anterior con ReplaySubject.

const subject = new ReplaySubject(3);
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

El resultado ha cambiado.

A pesar de la suscripci贸n tard铆a, los atrapamos a todos.



En resumen, el prop贸sito ReplaySubjectes la distribuci贸n de valores a todos los suscriptores y almacenarlos en cach茅 para futuros suscriptores "tard铆os".

Antes de continuar, le recomiendo que intente escribir su propia implementaci贸n BehaviorSubject. Puede encontrar el c贸digo terminado al final del art铆culo.

Ahora finalmente pasamos a operadores de multidifusi贸n. Espero que los ejemplos anteriores te ayuden a entenderlos m谩s r谩pido.

Operadores de multidifusi贸n


Multicast y Connect


El operador multicast() utiliza Subjectpara emitir la secuencia de origen a varios suscriptores.


import { interval, Subject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new Subject())
)

const observer1 = connectableObservable.subscribe(log);
const observer2 = connectableObservable.subscribe(log);

const connectableSubscription = (connectableObservable as ConnectableObservable<any>)
  .connect();

multicastdevuelve un objeto ConnectableObservableque tiene un m茅todo connect. Su prop贸sito es suscribir el asunto recibido a la secuencia fuente.

El m茅todo connectnos permite determinar cu谩ndo comenzar la ejecuci贸n del hilo original. Hay un momento a tener en cuenta: para darse de baja de la fuente que necesita hacer:

connectableSubscription.unsubscribe();

No estamos limitados a lo simple Subject. En su lugar, puede usar cualquier clase derivada, por ejemplo ReplaySubject:

import { interval, ReplaySubject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new ReplaySubject(1))
)

const observer1 = connectableObservable.subscribe(log);

setTimeout(() => {
  // Late subscriber
  connectableObservable.subscribe(log);
}, 3000)

const connectable = (connectableObservable as ConnectableObservable<any>).connect();

A partir de este c贸digo, puede adivinar lo que suceder谩 debajo del cap贸.

Cuando usamos multicast, podemos transferir no solo Subject, sino tambi茅n una funci贸n de f谩brica, que devuelve una nueva cada vez Subject.

Reutilizado ya completado subjectno puede ser, la funci贸n de f谩brica resuelve este problema.

interval(1000).pipe(
  multicast(() => new Subject())
)

Refundir


Cuando usamos el operador multicast(), somos responsables de la llamada connect()para iniciar la ejecuci贸n del observable original. Adem谩s, a煤n tenemos que monitorear posibles fugas de memoria, cancelando la suscripci贸n manualmente ConnectableSubscription.

La automatizaci贸n del proceso evitar铆a errores y simplificar铆a el c贸digo. Los amables desarrolladores de RxJS lo pensaron por nosotros y crearon un refCountoperador.

refCountcuenta las suscripciones y cuando aparece la primera, llama connect(), es decir se suscribe Cuando disminuye de nuevo a cero, se llamar谩 a una respuesta.

const source = interval(1000).pipe(
  multicast(new Subject()),
  refCount()
)
 
// refCount === 1 => source.subscribe();
const observer1 = source.subscribe(log);

// refCount === 2
const observer2 = source.subscribe(log);

setTimeout(() => {
  // refCount - 1
  observer1.unsubscribe();
  // refCount - 1
  observer2.unsubscribe();
  // refCount === 0 => source.unsubcribe();
}, 3000)

Tenga en cuenta que despu茅s de refCountobtener el observable habitual, no ConnectableObservable.

Publicar y sus opciones


multicast() + Subject + refCount()Este es un caso bastante t铆pico en RxJS y los desarrolladores lo han reducido a un solo operador.

Veamos qu茅 opciones tenemos.

  • publish() equivalente multicast(() => new Subject())
    const connectableObservable = interval(1000).pipe(
      publish()
    )
    
    connectableObservable.connect();
    

  • publishBehavior() equivalente multicast(new BehaviorSubject())
    const connectableObservable = interval(1000).pipe(
      publishBehavior(100)
    )
    
    connectableObservable.connect();
    

  • publishReplay() equivalente multicast(() => new ReplaySubject(x))
    const connectableObservable = interval(1000).pipe(
      publishReplay(3)
    )
    
    connectableObservable.connect();
    

  • publishLast() equivalente multicast(new AsyncSubject())
    const connectableObservable = interval(1000).pipe(
      take(2),
      publishLast()
    )
    
    connectableObservable.connect();
    

  • share() equivalente multicast(() => new Subject()) + refCount()
    const source = interval(1000).pipe(
      share()
    )
    

  • shareReplay(bufferSize) Este es un operador de multidifusi贸n que utiliza ReplaySubject. No tiene adentro multicast()y su resultado es observable, no ConnectableObservable. Se puede usar con refCounto sin 茅l. Aqu铆 hay dos opciones:

    interval(1000).pipe(
      shareReplay({ refCount: true, bufferSize: 1 })
    )
    
    interval(1000).pipe(
      shareReplay(1)
    )
    

Cuando se shareReplayllama con { refCount: false }es como llamar shareReplay(x).

En este caso, no habr谩 recuento de referencias. Esto significa que hasta que se complete la transmisi贸n original, se shareReplaysuscribir谩 a ella, independientemente de si tiene los suscriptores finales o no. Todos los nuevos suscriptores recibir谩n los 煤ltimos valores de x.

shareReplay vs PublicarReplay + refCount


A primera vista , es shareReplay({ refCount: true, bufferSize: X })id茅ntico publishReplay(X) + refCount() , pero esto no es del todo cierto.

Veamos cu谩les son las similitudes y cu谩l es la diferencia.

Tienen el mismo comportamiento refCount: suscribirse y darse de baja de la transmisi贸n original en funci贸n del n煤mero de suscriptores. Tambi茅n reaccionan de la misma manera cuando se completa la transmisi贸n original: todos los suscriptores nuevos reciben X 煤ltimos valores.

Sin embargo, si el flujo original a煤n no est谩 finalizado, en este caso cuando lo hayamos hecho publishReplay(X) + refCount(), todos los suscriptores nuevos recibir谩n valores X del b煤fer y luego se volver谩n a firmar con el mismo ReplaySubject.
Pero si usamos los shareReplay({ refCount: true, bufferSize: 1 })煤ltimos valores de X, no lo obtendr谩n, ya que en su interior crea uno nuevo ReplaySubject y lo usa para volver a suscribirse a la fuente.

Ejemplos que ilustran esto:

const source = interval(1000).pipe(
  publishReplay(1),
  refCount()
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
 
  // This subscriber will get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);

const source = interval(1000).pipe(
  shareReplay({ refCount: true, bufferSize: 1 })
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
  
  // This subscriber will NOT get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);





Ejemplos reales en angular


Veamos c贸mo usar los operadores de multidifusi贸n estudiados en condiciones de combate.

Usamos compartir


Supongamos que tenemos un componente que necesita datos de la secuencia original. Podr铆a ser una solicitud http, un estado o lo que sea. Y tambi茅n necesitamos manipulaci贸n de datos, como filtrado, clasificaci贸n, etc.

@Component({
  template: `
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;

  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
  }
}

Y ahora necesitamos otro componente que muestre solo el primer usuario. Si nos suscribimos a la secuencia de origen tal como est谩, entonces:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

Y ahora tenemos dos solicitudes http, las operaciones de clasificaci贸n o filtrado se realizar谩n dos veces.
Aplicamos share:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
      share()
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

Ya sabemos que crea Subjectuno nuevo que se suscribe a la fuente. Cuando se emite la fuente, el sujeto pasa este valor a todos sus suscriptores.

El problema est谩 resuelto y, cuando nos suscribimos firstUser$, nos suscribimos subjectdirectamente a la transmisi贸n interna y no a la transmisi贸n original.

Usando ShareReplay


ShareReplayse aplica cuando necesita emitir, almacenar en cach茅 y repetir los 煤ltimos valores de X. Un ejemplo t铆pico es un servicio singleton que realiza una solicitud http.


@Injectable({ providedIn: 'root' })
export class BlogService {
  posts$ = this.http.get('https://jsonplaceholder.typicode.com/posts')
              .pipe(shareReplay(1));

  constructor(private http: HttpClient) {}
}

No importa cu谩ntos componentes solicitar谩n datos ahora o en el futuro, solo habr谩 una solicitud http y el resultado se guardar谩 en el b煤fer interno ReplaySubject.
Todav铆a puede haber un caso en el que deba cancelar una solicitud incompleta, ya que no hay suscriptores, entonces deber谩 presentar una solicitud refCount.

El c贸digo completo se puede encontrar aqu铆 .

All Articles