Olá Habr! Apresento a você a tradução do artigo "Entendendo os operadores de difusão seletiva RxJS" da Netanel Basal.Os operadores de transmissão ou multicast geralmente parecem o tópico mais difícil de aprender sobre o RxJS. Neste artigo, tentarei explicar tudo de uma maneira acessível.Vamos considerar a estrutura interna dos operadores multicast e as tarefas que eles resolvem.Vamos começar descrevendo os blocos de construção básicos do RxJS.Observável
No RxJS, os objetos observáveis (a seguir denominados "fluxos") são inicialmente frios. Isso significa que toda vez que você assina um fluxo, é efetuado um retorno de chamada da assinatura.Para um melhor entendimento, crie a seguinte implementação:class Observable {
constructor(subscriptionFn) {
this.subscriptionFn = subscriptionFn;
}
subscribe(observer) {
return this.subscriptionFn(observer);
}
}
O construtor Observable
aceita um único parâmetro - o retorno de chamada da assinaturasubscriptionFn
. Ele será chamado toda vez que assinarmos o stream ( subscribe()
).Às vezes, eles também chamam um retorno de chamada de uma assinatura producer
, pois também "produz" valores para o assinante (objeto observador em nosso código).O método subscribe()
recebe uma entrada observer
. É um objeto com três próprios métodos: next(), error(), complete()
. No RxJS ao vivo, você pode passar três funções em vez de um objeto.O método, subscribe()
quando chamado, chama a função de assinatura passando-a para a entrada observer
.Nós não mencionamos o método agoraunsubscribe
, mas lembre-se de que cada assinatura fornece uma maneira de destruí-la. Na maioria das vezes, uma assinatura retorna uma função (ou um objeto com um método apropriado), durante o qual a conexão entre o fluxo e seus assinantes é destruída.Tudo isso é bem simples. Vamos nos aproximar da realidade agora. Por exemplo, agrupe uma API XHR nativa em um fluxo
function http(url) {
const subscriptionFn = observer => {
log('Observable execution: http');
const xhr = new XMLHttpRequest();
xhr.addEventListener('load', () => {
if (xhr.readyState === 4 && xhr.status === 200) {
observer.next(JSON.parse(xhr.responseText));
observer.complete();
}
});
xhr.open('GET', url);
xhr.send();
return () => xhr.abort()
}
return new Observable(subscriptionFn);
}
Escrevemos uma função http
que recebe uma URL, executa uma solicitação http e retorna um fluxo que emite a resposta http recebida.Agora, olhando para nossa implementação, o que você acha que acontecerá quando assinarmos esse fluxo duas vezes?
const observer = tag => ({
next(value) {
console.log(`${tag}:`, value);
}
});
http('https://jsonplaceholder.typicode.com/users')
.subscribe(observer('subscriber-1'));
http('https://jsonplaceholder.typicode.com/users')
.subscribe(observer('subscriber-2'));
Corretamente, duas solicitações http serão executadas. Se olharmos novamente para a implementação da classe Observable, veremos por que isso acontece. Cada assinante chama um retorno de chamada de assinatura, que, por sua vez, executa uma solicitação http a cada vez.
Operadores
Um operador é uma função que recebe um fluxo como entrada, executa qualquer ação e retorna um fluxo.Escreveremos nosso primeiro operador.function map(fn) {
return source => {
return new Observable(observer => {
log('Observable execution: map');
return source.subscribe({
next(value) {
observer.next(fn(value));
}
});
});
};
}
A função map()
retorna um operador que aceita o fluxo original e retorna um fluxo no qual todos os valores passantes serão passados pela função fn
.Essa. dentro dela, há sempre uma assinatura para o fluxo de entrada.Antes de usar esse novo operador, precisamos anexá-lo de alguma forma ao fluxo. Estender a nossa classe Observable
porpipe()
class Observable {
constructor(subscriptionFn) {
this.subscriptionFn = subscriptionFn;
}
subscribe(observer) {
return this.subscriptionFn(observer);
}
pipe(...operators) {
return operators.reduce((source, next) => next(source), this);
}
}
Um método simples, apenas uma linha de código. pipe
Ele pega um conjunto de operadores e os chama, passando cada entrada como resultado da anterior.Vamos usar nosso operador:http('https://jsonplaceholder.typicode.com/users')
.pipe(map(res => res[0]))
.subscribe(observer('subscriber'));
Quando chamada subscribe
, uma assinatura do fluxo de saída será executada map()
e, por sua vez map
, uma assinatura do fluxo original será executada dentro .O fluxo http emite o valor em que se enquadra map
. Em seguida, a função é executada fn
, o fluxo map
emite o valor para a assinatura final. Funciona como uma observable chain
corrente de fio.
Se assinarmos uma cadeia duas vezes, cada assinatura da cadeia será chamada duas vezes.const firstUser$ = http('https://jsonplaceholder.typicode.com/users')
.pipe(map(res => res[0]));
firstUser$.subscribe(observer('subscriber-1'));
firstUser$.subscribe(observer('subscriber-2'));
E se esse comportamento não nos convém? Se quisermos chamar a função de assinatura apenas uma vez, quantas assinaturas teríamos?Por exemplo, e se quisermos fazer uma solicitação http e usar o resultado para todos os assinantes? Nesse caso, você precisa Subject
.assuntos
Subject
é um fluxo e um assinante. O fluxo - porque tem um método subscribe()
, o assinante - porque implementa a interface do assinante - métodos next(), error(), complete().
Vamos escrevê-lo.class Subject extends Observable {
constructor() {
super();
this.observers = [];
}
subscribe(observer) {
this.observers.push(observer);
}
next(value) {
this.observers.forEach(observer => observer.next(value));
}
error(error) {
this.observers.forEach(observer => observer.error(error));
}
complete() {
this.observers.forEach(observer => observer.complete());
}
}
Subject
pode atuar como um intermediário entre o fluxo frio e muitos assinantes.Mude o nosso exemplo da seguinte maneira:const subject = new Subject();
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));
http('https://jsonplaceholder.typicode.com/users')
.pipe(map(res => res[0]))
.subscribe(subject);
Quando chamada subject.subscribe(someFn)
, apenas uma operação simples é executada - adicionando uma subject.observers
função à matriz someFn
.Bem, então, como também Subject
se comporta como assinante, você pode assiná-lo no fluxo original, ou seja, quando o encadeamento original emite um valor, ele é chamado subject.next()
, o que implica a transferência desse valor para cada um dos assinantes subject
.Agora, temos o retorno de chamada original da assinatura executado uma vez e apenas uma solicitação http será executada.
Retardatários do partido
O que acontece se o fluxo original já funcionou antes de nos inscrevermos?Não será possível mostrar isso no exemplo anterior, já que http é assíncrono, mesmo que você o assine imediatamente após, o valor continuará após a assinatura.Vamos criar rapidamente uma função geradora of
:
function of(...values) {
return new Observable(observer => {
log('Observable execution: of');
values.forEach(value => observer.next(value));
});
}
Um fluxo criado por meio of()
emite valores de forma síncrona, um após o outro. Assinaremos subject
depois que ele já estiver inscrito em.const subject = new Subject();
of(1, 2, 3).subscribe(subject);
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));
Nossos assinantes não receberam nada. Por quê? Nossa implementação não suporta assinantes "atrasados". Quando o fluxo original de of()
emite valores, os assinantes ainda não estão registrados, esses valores não vão a lugar algum.Em exemplos reais de Angular, pode ser que o fluxo de origem tenha funcionado, mas seu componente ainda não está presente na página. E quando o componente aparece, ele assina a fonte, mas não recebe os valores que já foram transmitidos.Uma maneira de resolver o problema é essa ReplaySubject
. Nós descrevemos sua versão e vemos como ela funciona.
class ReplaySubject extends Subject {
constructor(bufferSize) {
super();
this.observers = [];
this.bufferSize = bufferSize;
this.buffer = [];
}
subscribe(observer) {
this.buffer.forEach(val => observer.next(val));
this.observers.push(observer);
}
next(value) {
if (this.buffer.length === this.bufferSize) {
this.buffer.shift();
}
this.buffer.push(value);
this.observers.forEach(observer => observer.next(value));
}
}
O conceito é simples. Como o nome indica, ReplaySubject
esse é um especial Subject
que pode reproduzir valores antigos para todos os novos assinantes.Cada valor liberado será transferido para todos os assinantes atuais e salvo para futuros, o tamanho do buffer é bufferSize
definido no construtor.Reescreva o exemplo anterior com ReplaySubject
.const subject = new ReplaySubject(3);
of(1, 2, 3).subscribe(subject);
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));
O resultado foi alterado.Apesar da assinatura tardia, pegamos todos eles.
Resumindo, o objetivo ReplaySubject
é distribuir valores para todos os assinantes e armazená-los em cache para futuros assinantes "atrasados".Antes de prosseguir, recomendo que você tente escrever sua própria implementação BehaviorSubject
. Você pode encontrar o código finalizado no final do artigo.Agora, finalmente, passamos para os operadores multicast. Espero que os exemplos acima ajudem você a entendê-los mais rapidamente.Operadores de multicast
Multicast e conexão
O operador multicast()
usa Subject
para emitir o fluxo de origem para vários assinantes.
import { interval, Subject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';
const connectableObservable = interval(1000).pipe(
multicast(new Subject())
)
const observer1 = connectableObservable.subscribe(log);
const observer2 = connectableObservable.subscribe(log);
const connectableSubscription = (connectableObservable as ConnectableObservable<any>)
.connect();
multicast
devolve um objecto ConnectableObservable
que tem um método connect
. Seu objetivo é inscrever o assunto recebido no fluxo de origem.O método connect
nos permite determinar quando iniciar a execução do encadeamento original. Lembre-se de um momento - para cancelar a inscrição na fonte que você precisa:connectableSubscription.unsubscribe();
Não estamos limitados ao simples Subject
. Em vez disso, você pode usar qualquer classe derivada, por exemplo ReplaySubject
:import { interval, ReplaySubject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';
const connectableObservable = interval(1000).pipe(
multicast(new ReplaySubject(1))
)
const observer1 = connectableObservable.subscribe(log);
setTimeout(() => {
connectableObservable.subscribe(log);
}, 3000)
const connectable = (connectableObservable as ConnectableObservable<any>).connect();
A partir desse código, você pode adivinhar o que acontecerá sob o capô.Quando usamos multicast
, podemos transferir não apenas Subject
, mas também uma função de fábrica, que retorna uma nova a cada vez Subject
.Reutilizado já concluído subject
não pode ser, a função de fábrica resolve esse problema.interval(1000).pipe(
multicast(() => new Subject())
)
Reembolso
Quando usamos o operador multicast()
, somos responsáveis pela chamada connect()
para iniciar a execução do observável original. Além disso, ainda precisamos monitorar possíveis vazamentos de memória, cancelando a assinatura manualmente de ConnectableSubscription
.A automação do processo evitaria erros e simplificaria o código. O tipo de desenvolvedor do RxJS pensou nisso por nós e criou um refCount
operador.refCount
conta assinaturas e, quando o primeiro aparece, ele chama connect()
, ou seja, assina. Quando diminuir de volta para zero, uma resposta será chamada.const source = interval(1000).pipe(
multicast(new Subject()),
refCount()
)
const observer1 = source.subscribe(log);
const observer2 = source.subscribe(log);
setTimeout(() => {
observer1.unsubscribe();
observer2.unsubscribe();
}, 3000)
Note que, depois de refCount
obtermos o habitual observável, não ConnectableObservable
.Publicar e suas opções
multicast() + Subject + refCount()
esse é um caso bastante típico no RxJS e os desenvolvedores o reduziram para um único operador.Vamos ver quais opções temos.publish()
equivalente multicast(() => new Subject())
const connectableObservable = interval(1000).pipe(
publish()
)
connectableObservable.connect();
publishBehavior()
equivalente multicast(new BehaviorSubject())
const connectableObservable = interval(1000).pipe(
publishBehavior(100)
)
connectableObservable.connect();
publishReplay()
equivalente multicast(() => new ReplaySubject(x))
const connectableObservable = interval(1000).pipe(
publishReplay(3)
)
connectableObservable.connect();
publishLast()
equivalente multicast(new AsyncSubject())
const connectableObservable = interval(1000).pipe(
take(2),
publishLast()
)
connectableObservable.connect();
share()
equivalente multicast(() => new Subject()) + refCount()
const source = interval(1000).pipe(
share()
)
shareReplay(bufferSize)
Este é um operador multicast que usa ReplaySubject
. Ele não tem interior multicast()
e seu resultado é observável, não ConnectableObservable
. Pode ser usado com refCount
ou sem ele. Aqui estão as duas opções:
interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 })
)
interval(1000).pipe(
shareReplay(1)
)
Quando shareReplay
chamado com { refCount: false }
é como chamar shareReplay(x)
.Nesse caso, não haverá contagem de referência. Isso significa que, até que o fluxo original seja concluído, ele shareReplay
será inscrito, independentemente de ele próprio possuir os assinantes finais ou não. Todos os novos assinantes receberão os últimos x valores.shareReplay vs publishReplay + refCount
À primeira vista , é shareReplay({ refCount: true, bufferSize: X })
idêntico publishReplay(X) + refCount()
, mas isso não é inteiramente verdade.Vamos ver quais são as semelhanças e qual é a diferença.Eles têm o mesmo comportamento refCount
: inscrever-se e cancelar a inscrição no fluxo original com base no número de assinantes. Eles também reagem da mesma forma quando o fluxo original é concluído - todos os novos assinantes recebem X últimos valores.No entanto, se o fluxo original ainda não estiver finalizado, neste caso, quando tivermos publishReplay(X) + refCount()
- todos os novos assinantes receberão valores X do buffer e serão re-assinados usando o mesmo ReplaySubject
.Mas se usarmos os shareReplay({ refCount: true, bufferSize: 1 })
últimos valores X, eles não o receberão, pois dentro dele cria um novo ReplaySubject
e o usa para se inscrever novamente na fonte.Exemplos ilustrando isso:const source = interval(1000).pipe(
publishReplay(1),
refCount()
);
const one = source.subscribe(observer('subcriber-1'));
setTimeout(() => {
one.unsubscribe();
const two = source.subscribe(observer('subcriber-2'));
}, 3000);
const source = interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 })
);
const one = source.subscribe(observer('subcriber-1'));
setTimeout(() => {
one.unsubscribe();
const two = source.subscribe(observer('subcriber-2'));
}, 3000);


Exemplos reais em Angular
Vamos ver como usar os operadores multicast estudados em condições de combate.Usamos compartilhamento
Suponha que tenhamos um componente que precise de dados do fluxo original. Pode ser uma solicitação http, um estado ou qualquer outra coisa. E também precisamos de manipulação de dados, como filtragem, classificação etc.@Component({
template: `
<users-list [users]="allUsers$ | async"></users-list>
`,
})
export class UsersPageComponent {
allUsers$: Observable<User[]>;
constructor(private http: HttpClient) {
}
ngOnInit() {
this.allUsers$ = this.http.get('https://api/users').pipe(
map(users => filter/sort),
);
}
}
E agora precisamos de outro componente que mostre apenas o primeiro usuário. Se assinarmos o fluxo de origem, então:@Component({
template: `
<user [user]="firstUser$ | async"></user>
<users-list [users]="allUsers$ | async"></users-list>
`,
})
export class UsersPageComponent {
allUsers$: Observable<User[]>;
firstUser$: Observable<User>;
constructor(private http: HttpClient) {
}
ngOnInit() {
this.allUsers$ = this.http.get('https://api/users').pipe(
map(users => filter/sort),
);
this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
}
}
E agora temos duas solicitações HTTP, as operações de classificação ou filtragem serão realizadas duas vezes.Nós aplicamos share
:@Component({
template: `
<user [user]="firstUser$ | async"></user>
<users-list [users]="allUsers$ | async"></users-list>
`,
})
export class UsersPageComponent {
allUsers$: Observable<User[]>;
firstUser$: Observable<User>;
constructor(private http: HttpClient) {
}
ngOnInit() {
this.allUsers$ = this.http.get('https://api/users').pipe(
map(users => filter/sort),
share()
);
this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
}
}
Já sabemos que ele cria um novo Subject
que assina a fonte. Quando a fonte é emitida, o assunto passa esse valor para todos os seus assinantes.O problema foi resolvido e, quando assinamos firstUser$
- assinamos subject
o fluxo interno , e não o fluxo original diretamente.Usando o ShareReplay
ShareReplay
aplica-se quando você precisa emitir, armazenar em cache e repetir os últimos valores X. Um exemplo típico é um serviço singleton que executa uma solicitação http.
@Injectable({ providedIn: 'root' })
export class BlogService {
posts$ = this.http.get('https://jsonplaceholder.typicode.com/posts')
.pipe(shareReplay(1));
constructor(private http: HttpClient) {}
}
Não importa quantos componentes solicitarão dados agora ou no futuro, haverá apenas uma solicitação http e o resultado será salvo no buffer interno ReplaySubject
.Ainda pode haver um caso em que você precisa cancelar uma solicitação incompleta, porque não há assinantes, e você deve se inscrever refCount
.O código completo pode ser encontrado aqui .