哈Ha!我向您呈现Netanel Basal 撰写的文章“ Understanding RxJS Multicast Operators”的翻译。广播或多播运算符通常似乎是学习RxJS的最难的话题。在本文中,我将尝试以一种易于访问的方式来解释一切。我们将考虑多播运营商的内部结构及其解决的任务。让我们从描述RxJS的基本构建块开始。可观察的
在RxJS中,可观察对象(以下称为“流”)最初是冷的。这意味着每次您订阅流时,都会执行订阅的回调。为了更好地理解,请创建以下实现:class Observable {
constructor(subscriptionFn) {
this.subscriptionFn = subscriptionFn;
}
subscribe(observer) {
return this.subscriptionFn(observer);
}
}
构造函数Observable
接受一个参数-订阅的回调subscriptionFn
。每次我们订阅流(subscribe()
)都会调用它。有时他们会调用订阅的回调producer
,因为它还会为订阅者(我们代码中的观察者对象)``产生''值。该方法subscribe()
接受输入observer
。它是一个具有以下三种方法的对象:next(), error(), complete()
。在实时RxJS中,您可以传递三个函数而不是对象。该方法subscribe()
在调用时会调用订阅函数,将其传递给input observer
。我们现在没有提到该方法unsubscribe
,但请记住,每个订阅都提供了一种销毁它的方法。通常,订阅返回一个函数(或具有适当方法的对象),在此过程中,流及其订阅者之间的连接被破坏。这一切都非常简单。现在让我们更接近现实。例如,将本机XHR API包装在流中
function http(url) {
const subscriptionFn = observer => {
log('Observable execution: http');
const xhr = new XMLHttpRequest();
xhr.addEventListener('load', () => {
if (xhr.readyState === 4 && xhr.status === 200) {
observer.next(JSON.parse(xhr.responseText));
observer.complete();
}
});
xhr.open('GET', url);
xhr.send();
return () => xhr.abort()
}
return new Observable(subscriptionFn);
}
我们编写了一个函数http
,该函数接收URL,执行http请求并返回发出接收到的http响应的流。现在,看看我们的实现,当我们两次订阅此流时,您认为会发生什么?
const observer = tag => ({
next(value) {
console.log(`${tag}:`, value);
}
});
http('https://jsonplaceholder.typicode.com/users')
.subscribe(observer('subscriber-1'));
http('https://jsonplaceholder.typicode.com/users')
.subscribe(observer('subscriber-2'));
正确地,将执行两个http请求。如果我们再看一下Observable类的实现,我们将明白为什么会这样。每个订阅者都调用一个订阅回调,该回调又每次都执行一个http请求。
经营者
运算符是将流作为输入,执行任何操作并返回流的函数。我们将编写我们自己的第一个运算符。function map(fn) {
return source => {
return new Observable(observer => {
log('Observable execution: map');
return source.subscribe({
next(value) {
observer.next(fn(value));
}
});
});
};
}
该函数map()
返回一个接受原始流的运算符,并返回其中所有传递值都将通过该函数传递的流fn
。那些。在其中始终有对输入流的订阅。在使用此新运算符之前,我们需要以某种方式将其附加到流中。扩展我们班Observable
由pipe()
class Observable {
constructor(subscriptionFn) {
this.subscriptionFn = subscriptionFn;
}
subscribe(observer) {
return this.subscriptionFn(observer);
}
pipe(...operators) {
return operators.reduce((source, next) => next(source), this);
}
}
一种简单的方法,只需一行代码。pipe
它需要一组运算符并依次调用它们,并将每个输入的结果传递给前一个结果。让我们使用我们的运算符:http('https://jsonplaceholder.typicode.com/users')
.pipe(map(res => res[0]))
.subscribe(observer('subscriber'));
调用时subscribe
,将执行对输出流的订阅map()
,然后map
,将在内部执行对原始流的订阅。http流发出其所属的值map
。然后,执行该函数fn
,流从map
发出值到最终订阅。它像observable chain
线程链一样工作。
如果我们两次订阅一个链,则该链中的每个订阅都会被调用两次。const firstUser$ = http('https://jsonplaceholder.typicode.com/users')
.pipe(map(res => res[0]));
firstUser$.subscribe(observer('subscriber-1'));
firstUser$.subscribe(observer('subscriber-2'));
如果这种行为不适合我们?如果我们只想调用一次订阅功能,那么我们将拥有多少个订阅?例如,如果我们要发出一个http请求并将结果用于所有订阅者,该怎么办?在这种情况下,您需要Subject
。科目
Subject
它既是流又是订户。流-因为它有一个方法subscribe()
,订户-因为它实现了订户接口-方法next(), error(), complete().
让我们来编写它。class Subject extends Observable {
constructor() {
super();
this.observers = [];
}
subscribe(observer) {
this.observers.push(observer);
}
next(value) {
this.observers.forEach(observer => observer.next(value));
}
error(error) {
this.observers.forEach(observer => observer.error(error));
}
complete() {
this.observers.forEach(observer => observer.complete());
}
}
Subject
可以充当冷流和许多订户之间的中介。更改我们的示例,如下所示:const subject = new Subject();
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));
http('https://jsonplaceholder.typicode.com/users')
.pipe(map(res => res[0]))
.subscribe(subject);
调用时subject.subscribe(someFn)
,仅执行一个简单的操作-向数组添加一个subject.observers
函数someFn
。那么,既然它也Subject
充当订阅者,那么您可以将其订阅到原始流,即 当原始线程发出一个值时,它将被调用subject.next()
,这需要将该值传输到每个订户subject
。现在我们已经执行了订阅的原始回调一次,并且只会执行一个http请求。
派对晚宴者
如果原始流在我们注册之前已经起作用了,该怎么办?在上一个示例中将无法显示此内容,因为http是异步的,即使您在紧随其后进行预订,该值仍将在预订之后出现。让我们快速创建一个生成函数of
:
function of(...values) {
return new Observable(observer => {
log('Observable execution: of');
values.forEach(value => observer.next(value));
});
}
通过手段创建的流of()
会一个接一个地发出值。我们subject
已经订阅了,我们将对其进行订阅。const subject = new Subject();
of(1, 2, 3).subscribe(subject);
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));
我们的订户没有收到任何东西。为什么?我们的实现不支持“后期”订阅者。当原始流从中of()
发出值时,尚未注册订阅者,这些值将无处可寻。在Angular上的实际示例中,很可能是源流起作用了,但是页面上还没有组件。并且当组件出现时,它订阅源,但不接收已经传递的值。解决问题的一种方法是这样ReplaySubject
。我们概述其版本,并查看其工作方式。
class ReplaySubject extends Subject {
constructor(bufferSize) {
super();
this.observers = [];
this.bufferSize = bufferSize;
this.buffer = [];
}
subscribe(observer) {
this.buffer.forEach(val => observer.next(val));
this.observers.push(observer);
}
next(value) {
if (this.buffer.length === this.bufferSize) {
this.buffer.shift();
}
this.buffer.push(value);
this.observers.forEach(observer => observer.next(value));
}
}
这个概念很简单。顾名思义,ReplaySubject
这是一种特殊的功能Subject
,可以为所有新订户重现旧值。每个释放的值将被转移到所有当前的订户,并保存以备将来使用,缓冲区大小bufferSize
在构造函数中设置。用重写前面的示例ReplaySubject
。const subject = new ReplaySubject(3);
of(1, 2, 3).subscribe(subject);
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));
结果已更改。尽管订阅较晚,但我们都抓住了他们。
总而言之,目的ReplaySubject
是将价值分配给所有订户并将其缓存给将来的``后期''订户。在继续之前,建议您尝试编写自己的实现BehaviorSubject
。您可以在文章末尾找到完成的代码。现在,我们终于转到多播运算符。希望以上示例可以帮助您更快地理解它们。组播运营商
组播和连接
运营商multicast()
用于Subject
将源流发布给多个订户。
import { interval, Subject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';
const connectableObservable = interval(1000).pipe(
multicast(new Subject())
)
const observer1 = connectableObservable.subscribe(log);
const observer2 = connectableObservable.subscribe(log);
const connectableSubscription = (connectableObservable as ConnectableObservable<any>)
.connect();
multicast
返回ConnectableObservable
具有方法的对象connect
。其目的是为接收到的主题订阅源流。该方法connect
使我们能够确定何时开始执行原始线程。有个时刻需要牢记-取消订阅您需要做的消息:connectableSubscription.unsubscribe();
我们不仅限于简单Subject
。相反,您可以使用任何派生类,例如ReplaySubject
:import { interval, ReplaySubject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';
const connectableObservable = interval(1000).pipe(
multicast(new ReplaySubject(1))
)
const observer1 = connectableObservable.subscribe(log);
setTimeout(() => {
connectableObservable.subscribe(log);
}, 3000)
const connectable = (connectableObservable as ConnectableObservable<any>).connect();
从此代码中,您可以猜测幕后情况。使用时multicast
,我们不仅可以传递Subject
工厂函数,还可以传递工厂函数,该函数每次都会返回一个新函数Subject
。重用已经完成的subject
不能,工厂功能解决了这个问题。interval(1000).pipe(
multicast(() => new Subject())
)
翻新
当我们使用operator时multicast()
,我们负责调用connect()
以开始执行原始的可观察对象。另外,我们仍然必须监视可能的内存泄漏,并从中手动退订ConnectableSubscription
。流程的自动化将避免错误并简化代码。善良的RxJS开发人员为我们考虑了这一点,并创建了一个refCount
运算符。refCount
计算订阅数,当第一个出现时,它会调用connect()
,即 订阅。当它减少回零时,将调用一个响应。const source = interval(1000).pipe(
multicast(new Subject()),
refCount()
)
const observer1 = source.subscribe(log);
const observer2 = source.subscribe(log);
setTimeout(() => {
observer1.unsubscribe();
observer2.unsubscribe();
}, 3000)
请注意,在refCount
得到通常的可观察值之后,不是ConnectableObservable
。发布及其选项
multicast() + Subject + refCount()
这在RxJS中是相当典型的情况,开发人员已将其简化为单个运算符。让我们看看我们有哪些选择。publish()
当量 multicast(() => new Subject())
const connectableObservable = interval(1000).pipe(
publish()
)
connectableObservable.connect();
publishBehavior()
当量 multicast(new BehaviorSubject())
const connectableObservable = interval(1000).pipe(
publishBehavior(100)
)
connectableObservable.connect();
publishReplay()
当量 multicast(() => new ReplaySubject(x))
const connectableObservable = interval(1000).pipe(
publishReplay(3)
)
connectableObservable.connect();
publishLast()
当量 multicast(new AsyncSubject())
const connectableObservable = interval(1000).pipe(
take(2),
publishLast()
)
connectableObservable.connect();
share()
当量 multicast(() => new Subject()) + refCount()
const source = interval(1000).pipe(
share()
)
shareReplay(bufferSize)
这是使用的多播运算符ReplaySubject
。他没有内在multicast()
,他的结果是可以观察到的,不是ConnectableObservable
。可以使用它,也可以不使用refCount
它。这是两个选项:
interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 })
)
interval(1000).pipe(
shareReplay(1)
)
shareReplay
与{ refCount: false }
通话
时就像在通话shareReplay(x)
。在这种情况下,将不会进行引用计数。这意味着在原始流完成之前,shareReplay
将对其进行订阅,而不管其本身是否具有最终订阅者。所有新订户将收到最后x个值。shareReplay vs publishReplay + refCount
乍一看,它是shareReplay({ refCount: true, bufferSize: X })
相同的publishReplay(X) + refCount()
,但这并非完全正确。让我们看看有什么相似之处和有什么区别。它们具有相同的行为refCount
-根据订阅者的数量订阅和取消订阅原始流。当原始流完成时,它们的反应也相同-所有新订户都收到X个最后一个值。但是,如果原始流尚未完成,则在这种情况下publishReplay(X) + refCount()
-所有新订阅者都从缓冲区接收X值,然后将使用相同的值重新签名ReplaySubject
。但是,如果我们使用shareReplay({ refCount: true, bufferSize: 1 })
最后一个X值,则它们将无法获得它,因为在它的内部会创建一个新 值,ReplaySubject
并使用它来重新订阅源。示例说明了这一点:const source = interval(1000).pipe(
publishReplay(1),
refCount()
);
const one = source.subscribe(observer('subcriber-1'));
setTimeout(() => {
one.unsubscribe();
const two = source.subscribe(observer('subcriber-2'));
}, 3000);
const source = interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 })
);
const one = source.subscribe(observer('subcriber-1'));
setTimeout(() => {
one.unsubscribe();
const two = source.subscribe(observer('subcriber-2'));
}, 3000);


Angular中的实际示例
让我们看看如何在战斗条件下使用研究过的多播运营商。我们使用分享
假设我们有一个需要原始流中的数据的组件。它可以是http请求,状态或其他任何内容。而且我们还需要数据处理,例如过滤,排序等。@Component({
template: `
<users-list [users]="allUsers$ | async"></users-list>
`,
})
export class UsersPageComponent {
allUsers$: Observable<User[]>;
constructor(private http: HttpClient) {
}
ngOnInit() {
this.allUsers$ = this.http.get('https://api/users').pipe(
map(users => filter/sort),
);
}
}
现在,我们需要另一个仅显示第一个用户的组件。如果我们按原样订阅源流,那么:@Component({
template: `
<user [user]="firstUser$ | async"></user>
<users-list [users]="allUsers$ | async"></users-list>
`,
})
export class UsersPageComponent {
allUsers$: Observable<User[]>;
firstUser$: Observable<User>;
constructor(private http: HttpClient) {
}
ngOnInit() {
this.allUsers$ = this.http.get('https://api/users').pipe(
map(users => filter/sort),
);
this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
}
}
现在我们有两个http请求,排序或过滤操作将执行两次。我们申请share
:@Component({
template: `
<user [user]="firstUser$ | async"></user>
<users-list [users]="allUsers$ | async"></users-list>
`,
})
export class UsersPageComponent {
allUsers$: Observable<User[]>;
firstUser$: Observable<User>;
constructor(private http: HttpClient) {
}
ngOnInit() {
this.allUsers$ = this.http.get('https://api/users').pipe(
map(users => filter/sort),
share()
);
this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
}
}
我们已经知道他创建了一个新的Subject
订阅源。当源发出时,主题会将这个值传递给它的所有订阅者。问题已解决,当我们订阅时firstUser$
-我们订阅了internal subject
,而不是直接订阅了原始流。使用ShareReplay
ShareReplay
当您需要发出,缓存和重复最后的X个值时适用。一个典型的示例是执行http请求的单例服务。
@Injectable({ providedIn: 'root' })
export class BlogService {
posts$ = this.http.get('https://jsonplaceholder.typicode.com/posts')
.pipe(shareReplay(1));
constructor(private http: HttpClient) {}
}
不论现在或将来有多少组件将请求数据,都将只有一个http请求,并且结果将保存在内部缓冲区中ReplaySubject
。在某些情况下,由于没有订阅者,您需要取消未完成的请求,然后您需要申请refCount
。完整的代码可以在这里找到。