Estudamos operadores RxJS multicast

Olá Habr! Apresento a você a tradução do artigo "Entendendo os operadores de difusão seletiva RxJS" da Netanel Basal.

Os operadores de transmissão ou multicast geralmente parecem o tópico mais difícil de aprender sobre o RxJS. Neste artigo, tentarei explicar tudo de uma maneira acessível.

Vamos considerar a estrutura interna dos operadores multicast e as tarefas que eles resolvem.

Vamos começar descrevendo os blocos de construção básicos do RxJS.

Observável


No RxJS, os objetos observáveis ​​(a seguir denominados "fluxos") são inicialmente frios. Isso significa que toda vez que você assina um fluxo, é efetuado um retorno de chamada da assinatura.

Para um melhor entendimento, crie a seguinte implementação:

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }
}

O construtor Observableaceita um único parâmetro - o retorno de chamada da assinatura
subscriptionFn. Ele será chamado toda vez que assinarmos o stream ( subscribe()).

Às vezes, eles também chamam um retorno de chamada de uma assinatura producer, pois também "produz" valores para o assinante (objeto observador em nosso código).

O método subscribe()recebe uma entrada observer. É um objeto com três próprios métodos: next(), error(), complete(). No RxJS ao vivo, você pode passar três funções em vez de um objeto.

O método, subscribe()quando chamado, chama a função de assinatura passando-a para a entrada observer.

Nós não mencionamos o método agoraunsubscribe, mas lembre-se de que cada assinatura fornece uma maneira de destruí-la. Na maioria das vezes, uma assinatura retorna uma função (ou um objeto com um método apropriado), durante o qual a conexão entre o fluxo e seus assinantes é destruída.

Tudo isso é bem simples. Vamos nos aproximar da realidade agora. Por exemplo, agrupe uma API XHR nativa em um fluxo


function http(url) {
  // This function will be called when we call http().subscribe()
  const subscriptionFn = observer => {
    log('Observable execution: http');
    const xhr = new XMLHttpRequest();
    xhr.addEventListener('load', () => {
      if (xhr.readyState === 4 && xhr.status === 200) {
        observer.next(JSON.parse(xhr.responseText));
        observer.complete();
      }
    });
    xhr.open('GET', url);
    xhr.send();
    
    return () => xhr.abort()
  }
  
  return new Observable(subscriptionFn);
}

Escrevemos uma função httpque recebe uma URL, executa uma solicitação http e retorna um fluxo que emite a resposta http recebida.

Agora, olhando para nossa implementação, o que você acha que acontecerá quando assinarmos esse fluxo duas vezes?


// A small observer helper
const observer = tag => ({
  next(value) {
    console.log(`${tag}:`, value);
  }
});

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-1'));

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-2'));

Corretamente, duas solicitações http serão executadas. Se olharmos novamente para a implementação da classe Observable, veremos por que isso acontece. Cada assinante chama um retorno de chamada de assinatura, que, por sua vez, executa uma solicitação http a cada vez.



Operadores


Um operador é uma função que recebe um fluxo como entrada, executa qualquer ação e retorna um fluxo.

Escreveremos nosso primeiro operador.
function map(fn) {
  return source => {
    return new Observable(observer => {
      log('Observable execution: map');
      return source.subscribe({
        next(value) {
          observer.next(fn(value));
        }
      });
    });
  };
}

A função map()retorna um operador que aceita o fluxo original e retorna um fluxo no qual todos os valores passantes serão passados ​​pela função fn.

Essa. dentro dela, há sempre uma assinatura para o fluxo de entrada.

Antes de usar esse novo operador, precisamos anexá-lo de alguma forma ao fluxo. Estender a nossa classe Observableporpipe()

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }

  pipe(...operators) {
    return operators.reduce((source, next) => next(source), this);
  }
}

Um método simples, apenas uma linha de código. pipeEle pega um conjunto de operadores e os chama, passando cada entrada como resultado da anterior.

Vamos usar nosso operador:

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(observer('subscriber'));

Quando chamada subscribe, uma assinatura do fluxo de saída será executada map()e, por sua vez map, uma assinatura do fluxo original será executada dentro .

O fluxo http emite o valor em que se enquadra map. Em seguida, a função é executada fn, o fluxo mapemite o valor para a assinatura final. Funciona como uma observable chaincorrente de fio.



Se assinarmos uma cadeia duas vezes, cada assinatura da cadeia será chamada duas vezes.

const firstUser$ = http('https://jsonplaceholder.typicode.com/users')
    .pipe(map(res => res[0]));

firstUser$.subscribe(observer('subscriber-1'));
firstUser$.subscribe(observer('subscriber-2'));



E se esse comportamento não nos convém? Se quisermos chamar a função de assinatura apenas uma vez, quantas assinaturas teríamos?

Por exemplo, e se quisermos fazer uma solicitação http e usar o resultado para todos os assinantes? Nesse caso, você precisa Subject.

assuntos


Subjecté um fluxo e um assinante. O fluxo - porque tem um método subscribe(), o assinante - porque implementa a interface do assinante - métodos next(), error(), complete().

Vamos escrevê-lo.

class Subject extends Observable {
  constructor() {
    super();
    this.observers = [];
  }

  subscribe(observer) {
    this.observers.push(observer);
  }

  next(value) {
    this.observers.forEach(observer => observer.next(value));
  }

  error(error) {
    this.observers.forEach(observer => observer.error(error));
  }

  complete() {
    this.observers.forEach(observer => observer.complete());
  }
}

Subjectpode atuar como um intermediário entre o fluxo frio e muitos assinantes.

Mude o nosso exemplo da seguinte maneira:

const subject = new Subject();
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(subject);

Quando chamada subject.subscribe(someFn), apenas uma operação simples é executada - adicionando uma subject.observersfunção à matriz someFn.

Bem, então, como também Subjectse comporta como assinante, você pode assiná-lo no fluxo original, ou seja, quando o encadeamento original emite um valor, ele é chamado subject.next(), o que implica a transferência desse valor para cada um dos assinantes subject.

Agora, temos o retorno de chamada original da assinatura executado uma vez e apenas uma solicitação http será executada.



Retardatários do partido


O que acontece se o fluxo original já funcionou antes de nos inscrevermos?

Não será possível mostrar isso no exemplo anterior, já que http é assíncrono, mesmo que você o assine imediatamente após, o valor continuará após a assinatura.

Vamos criar rapidamente uma função geradora of:


function of(...values) {
  return new Observable(observer => {
    log('Observable execution: of');
    values.forEach(value => observer.next(value));
  });
}

Um fluxo criado por meio of()emite valores de forma síncrona, um após o outro. Assinaremos subjectdepois que ele já estiver inscrito em.

const subject = new Subject();
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

Nossos assinantes não receberam nada. Por quê? Nossa implementação não suporta assinantes "atrasados". Quando o fluxo original de of()emite valores, os assinantes ainda não estão registrados, esses valores não vão a lugar algum.

Em exemplos reais de Angular, pode ser que o fluxo de origem tenha funcionado, mas seu componente ainda não está presente na página. E quando o componente aparece, ele assina a fonte, mas não recebe os valores que já foram transmitidos.

Uma maneira de resolver o problema é essa ReplaySubject. Nós descrevemos sua versão e vemos como ela funciona.


class ReplaySubject extends Subject {
  constructor(bufferSize) {
    super();
    this.observers = [];
    this.bufferSize = bufferSize;
    this.buffer = [];
  }

  subscribe(observer) {
    this.buffer.forEach(val => observer.next(val));
    this.observers.push(observer);
  }

  next(value) {
    if (this.buffer.length === this.bufferSize) {
      this.buffer.shift();
    }

    this.buffer.push(value);
    this.observers.forEach(observer => observer.next(value));
  }
}

O conceito é simples. Como o nome indica, ReplaySubjectesse é um especial Subjectque pode reproduzir valores antigos para todos os novos assinantes.

Cada valor liberado será transferido para todos os assinantes atuais e salvo para futuros, o tamanho do buffer é bufferSizedefinido no construtor.

Reescreva o exemplo anterior com ReplaySubject.

const subject = new ReplaySubject(3);
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

O resultado foi alterado.

Apesar da assinatura tardia, pegamos todos eles.



Resumindo, o objetivo ReplaySubjecté distribuir valores para todos os assinantes e armazená-los em cache para futuros assinantes "atrasados".

Antes de prosseguir, recomendo que você tente escrever sua própria implementação BehaviorSubject. Você pode encontrar o código finalizado no final do artigo.

Agora, finalmente, passamos para os operadores multicast. Espero que os exemplos acima ajudem você a entendê-los mais rapidamente.

Operadores de multicast


Multicast e conexão


O operador multicast() usa Subjectpara emitir o fluxo de origem para vários assinantes.


import { interval, Subject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new Subject())
)

const observer1 = connectableObservable.subscribe(log);
const observer2 = connectableObservable.subscribe(log);

const connectableSubscription = (connectableObservable as ConnectableObservable<any>)
  .connect();

multicastdevolve um objecto ConnectableObservableque tem um método connect. Seu objetivo é inscrever o assunto recebido no fluxo de origem.

O método connectnos permite determinar quando iniciar a execução do encadeamento original. Lembre-se de um momento - para cancelar a inscrição na fonte que você precisa:

connectableSubscription.unsubscribe();

Não estamos limitados ao simples Subject. Em vez disso, você pode usar qualquer classe derivada, por exemplo ReplaySubject:

import { interval, ReplaySubject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new ReplaySubject(1))
)

const observer1 = connectableObservable.subscribe(log);

setTimeout(() => {
  // Late subscriber
  connectableObservable.subscribe(log);
}, 3000)

const connectable = (connectableObservable as ConnectableObservable<any>).connect();

A partir desse código, você pode adivinhar o que acontecerá sob o capô.

Quando usamos multicast, podemos transferir não apenas Subject, mas também uma função de fábrica, que retorna uma nova a cada vez Subject.

Reutilizado já concluído subjectnão pode ser, a função de fábrica resolve esse problema.

interval(1000).pipe(
  multicast(() => new Subject())
)

Reembolso


Quando usamos o operador multicast(), somos responsáveis ​​pela chamada connect()para iniciar a execução do observável original. Além disso, ainda precisamos monitorar possíveis vazamentos de memória, cancelando a assinatura manualmente de ConnectableSubscription.

A automação do processo evitaria erros e simplificaria o código. O tipo de desenvolvedor do RxJS pensou nisso por nós e criou um refCountoperador.

refCountconta assinaturas e, quando o primeiro aparece, ele chama connect(), ou seja, assina. Quando diminuir de volta para zero, uma resposta será chamada.

const source = interval(1000).pipe(
  multicast(new Subject()),
  refCount()
)
 
// refCount === 1 => source.subscribe();
const observer1 = source.subscribe(log);

// refCount === 2
const observer2 = source.subscribe(log);

setTimeout(() => {
  // refCount - 1
  observer1.unsubscribe();
  // refCount - 1
  observer2.unsubscribe();
  // refCount === 0 => source.unsubcribe();
}, 3000)

Note que, depois de refCountobtermos o habitual observável, não ConnectableObservable.

Publicar e suas opções


multicast() + Subject + refCount()esse é um caso bastante típico no RxJS e os desenvolvedores o reduziram para um único operador.

Vamos ver quais opções temos.

  • publish() equivalente multicast(() => new Subject())
    const connectableObservable = interval(1000).pipe(
      publish()
    )
    
    connectableObservable.connect();
    

  • publishBehavior() equivalente multicast(new BehaviorSubject())
    const connectableObservable = interval(1000).pipe(
      publishBehavior(100)
    )
    
    connectableObservable.connect();
    

  • publishReplay() equivalente multicast(() => new ReplaySubject(x))
    const connectableObservable = interval(1000).pipe(
      publishReplay(3)
    )
    
    connectableObservable.connect();
    

  • publishLast() equivalente multicast(new AsyncSubject())
    const connectableObservable = interval(1000).pipe(
      take(2),
      publishLast()
    )
    
    connectableObservable.connect();
    

  • share() equivalente multicast(() => new Subject()) + refCount()
    const source = interval(1000).pipe(
      share()
    )
    

  • shareReplay(bufferSize) Este é um operador multicast que usa ReplaySubject. Ele não tem interior multicast()e seu resultado é observável, não ConnectableObservable. Pode ser usado com refCountou sem ele. Aqui estão as duas opções:

    interval(1000).pipe(
      shareReplay({ refCount: true, bufferSize: 1 })
    )
    
    interval(1000).pipe(
      shareReplay(1)
    )
    

Quando shareReplaychamado com { refCount: false }é como chamar shareReplay(x).

Nesse caso, não haverá contagem de referência. Isso significa que, até que o fluxo original seja concluído, ele shareReplayserá inscrito, independentemente de ele próprio possuir os assinantes finais ou não. Todos os novos assinantes receberão os últimos x valores.

shareReplay vs publishReplay + refCount


À primeira vista , é shareReplay({ refCount: true, bufferSize: X })idêntico publishReplay(X) + refCount() , mas isso não é inteiramente verdade.

Vamos ver quais são as semelhanças e qual é a diferença.

Eles têm o mesmo comportamento refCount: inscrever-se e cancelar a inscrição no fluxo original com base no número de assinantes. Eles também reagem da mesma forma quando o fluxo original é concluído - todos os novos assinantes recebem X últimos valores.

No entanto, se o fluxo original ainda não estiver finalizado, neste caso, quando tivermos publishReplay(X) + refCount()- todos os novos assinantes receberão valores X do buffer e serão re-assinados usando o mesmo ReplaySubject.
Mas se usarmos os shareReplay({ refCount: true, bufferSize: 1 })últimos valores X, eles não o receberão, pois dentro dele cria um novo ReplaySubject e o usa para se inscrever novamente na fonte.

Exemplos ilustrando isso:

const source = interval(1000).pipe(
  publishReplay(1),
  refCount()
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
 
  // This subscriber will get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);

const source = interval(1000).pipe(
  shareReplay({ refCount: true, bufferSize: 1 })
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
  
  // This subscriber will NOT get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);





Exemplos reais em Angular


Vamos ver como usar os operadores multicast estudados em condições de combate.

Usamos compartilhamento


Suponha que tenhamos um componente que precise de dados do fluxo original. Pode ser uma solicitação http, um estado ou qualquer outra coisa. E também precisamos de manipulação de dados, como filtragem, classificação etc.

@Component({
  template: `
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;

  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
  }
}

E agora precisamos de outro componente que mostre apenas o primeiro usuário. Se assinarmos o fluxo de origem, então:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

E agora temos duas solicitações HTTP, as operações de classificação ou filtragem serão realizadas duas vezes.
Nós aplicamos share:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
      share()
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

Já sabemos que ele cria um novo Subjectque assina a fonte. Quando a fonte é emitida, o assunto passa esse valor para todos os seus assinantes.

O problema foi resolvido e, quando assinamos firstUser$- assinamos subjecto fluxo interno , e não o fluxo original diretamente.

Usando o ShareReplay


ShareReplayaplica-se quando você precisa emitir, armazenar em cache e repetir os últimos valores X. Um exemplo típico é um serviço singleton que executa uma solicitação http.


@Injectable({ providedIn: 'root' })
export class BlogService {
  posts$ = this.http.get('https://jsonplaceholder.typicode.com/posts')
              .pipe(shareReplay(1));

  constructor(private http: HttpClient) {}
}

Não importa quantos componentes solicitarão dados agora ou no futuro, haverá apenas uma solicitação http e o resultado será salvo no buffer interno ReplaySubject.
Ainda pode haver um caso em que você precisa cancelar uma solicitação incompleta, porque não há assinantes, e você deve se inscrever refCount.

O código completo pode ser encontrado aqui .

All Articles