我们研究多播RxJS运算符

哈Ha!我向您呈现Netanel Basal 撰写的文章“ Understanding RxJS Multicast Operators”的翻译

广播或多播运算符通常似乎是学习RxJS的最难的话题。在本文中,我将尝试以一种易于访问的方式来解释一切。

我们将考虑多播运营商的内部结构及其解决的任务。

让我们从描述RxJS的基本构建块开始。

可观察的


在RxJS中,可观察对象(以下称为“流”)最初是冷的。这意味着每次您订阅流时,都会执行订阅的回调。

为了更好地理解,请创建以下实现:

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }
}

构造函数Observable接受一个参数-订阅的回调
subscriptionFn。每次我们订阅流(subscribe()都会调用它

有时他们会调用订阅的回调producer,因为它还会为订阅者(我们代码中的观察者对象)``产生''值。

该方法subscribe()接受输入observer。它是一个具有以下三种方法的对象:next(), error(), complete()。在实时RxJS中,您可以传递三个函数而不是对象。

该方法subscribe()在调用时会调用订阅函数,将其传递给input observer

我们现在没有提到该方法unsubscribe,但请记住,每个订阅都提供了一种销毁它的方法。通常,订阅返回一个函数(或具有适当方法的对象),在此过程中,流及其订阅者之间的连接被破坏。

这一切都非常简单。现在让我们更接近现实。例如,将本机XHR API包装在流中


function http(url) {
  // This function will be called when we call http().subscribe()
  const subscriptionFn = observer => {
    log('Observable execution: http');
    const xhr = new XMLHttpRequest();
    xhr.addEventListener('load', () => {
      if (xhr.readyState === 4 && xhr.status === 200) {
        observer.next(JSON.parse(xhr.responseText));
        observer.complete();
      }
    });
    xhr.open('GET', url);
    xhr.send();
    
    return () => xhr.abort()
  }
  
  return new Observable(subscriptionFn);
}

我们编写了一个函数http该函数接收URL,执行http请求并返回发出接收到的http响应的流。

现在,看看我们的实现,当我们两次订阅此流时,您认为会发生什么?


// A small observer helper
const observer = tag => ({
  next(value) {
    console.log(`${tag}:`, value);
  }
});

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-1'));

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-2'));

正确地,将执行两个http请求。如果我们再看一下Observable类的实现,我们将明白为什么会这样。每个订阅者都调用一个订阅回调,该回调又每次都执行一个http请求。



经营者


运算符是将流作为输入,执行任何操作并返回流的函数。

我们将编写我们自己的第一个运算符。
function map(fn) {
  return source => {
    return new Observable(observer => {
      log('Observable execution: map');
      return source.subscribe({
        next(value) {
          observer.next(fn(value));
        }
      });
    });
  };
}

该函数map()返回一个接受原始流的运算符,并返回其中所有传递值都将通过该函数传递的流fn

那些。在其中始终有对输入流的订阅。

在使用此新运算符之前,我们需要以某种方式将其附加到流中。扩展我们班Observablepipe()

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }

  pipe(...operators) {
    return operators.reduce((source, next) => next(source), this);
  }
}

一种简单的方法,只需一行代码。pipe它需要一组运算符并依次调用它们,并将每个输入的结果传递给前一个结果。

让我们使用我们的运算符:

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(observer('subscriber'));

调用时subscribe,将执行对输出流的订阅map(),然后map,将在内部执行对原始流的订阅。

http流发出其所属的值map然后,执行该函数fn,流从map发出值到最终订阅。它像observable chain线程链一样工作。



如果我们两次订阅一个链,则该链中的每个订阅都会被调用两次。

const firstUser$ = http('https://jsonplaceholder.typicode.com/users')
    .pipe(map(res => res[0]));

firstUser$.subscribe(observer('subscriber-1'));
firstUser$.subscribe(observer('subscriber-2'));



如果这种行为不适合我们?如果我们只想调用一次订阅功能,那么我们将拥有多少个订阅?

例如,如果我们要发出一个http请求并将结果用于所有订阅者,该怎么办?在这种情况下,您需要Subject

科目


Subject它既是流又是订户。流-因为它有一个方法subscribe(),订户-因为它实现了订户接口-方法next(), error(), complete().

让我们来编写它。

class Subject extends Observable {
  constructor() {
    super();
    this.observers = [];
  }

  subscribe(observer) {
    this.observers.push(observer);
  }

  next(value) {
    this.observers.forEach(observer => observer.next(value));
  }

  error(error) {
    this.observers.forEach(observer => observer.error(error));
  }

  complete() {
    this.observers.forEach(observer => observer.complete());
  }
}

Subject可以充当冷流和许多订户之间的中介。

更改我们的示例,如下所示:

const subject = new Subject();
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(subject);

调用时subject.subscribe(someFn),仅执行一个简单的操作-向数组添加一个subject.observers函数someFn

那么,既然它也Subject充当订阅者,那么您可以将其订阅到原始流,即 当原始线程发出一个值时,它将被调用subject.next(),这需要将该值传输到每个订户subject

现在我们已经执行了订阅的原始回调一次,并且只会执行一个http请求。



派对晚宴者


如果原始流在我们注册之前已经起作用了,该怎么办?

在上一个示例中将无法显示此内容,因为http是异步的,即使您在紧随其后进行预订,该值仍将在预订之后出现。

让我们快速创建一个生成函数of


function of(...values) {
  return new Observable(observer => {
    log('Observable execution: of');
    values.forEach(value => observer.next(value));
  });
}

通过手段创建的流of()会一个接一个地发出值。我们subject已经订阅了,我们将对其进行订阅。

const subject = new Subject();
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

我们的订户没有收到任何东西。为什么?我们的实现不支持“后期”订阅者。当原始流从中of()发出值时,尚未注册订阅者,这些值将无处可寻。

在Angular上的实际示例中,很可能是源流起作用了,但是页面上还没有组件。并且当组件出现时,它订阅源,但不接收已经传递的值。

解决问题的一种方法是这样ReplaySubject。我们概述其版本,并查看其工作方式。


class ReplaySubject extends Subject {
  constructor(bufferSize) {
    super();
    this.observers = [];
    this.bufferSize = bufferSize;
    this.buffer = [];
  }

  subscribe(observer) {
    this.buffer.forEach(val => observer.next(val));
    this.observers.push(observer);
  }

  next(value) {
    if (this.buffer.length === this.bufferSize) {
      this.buffer.shift();
    }

    this.buffer.push(value);
    this.observers.forEach(observer => observer.next(value));
  }
}

这个概念很简单。顾名思义,ReplaySubject这是一种特殊的功能Subject,可以为所有新订户重现旧值。

每个释放的值将被转移到所有当前的订户,并保存以备将来使用,缓冲区大小bufferSize在构造函数中设置。

用重写前面的示例ReplaySubject

const subject = new ReplaySubject(3);
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

结果已更改。

尽管订阅较晚,但我们都抓住了他们。



总而言之,目的ReplaySubject是将价值分配给所有订户并将其缓存给将来的``后期''订户。

在继续之前,建议您尝试编写自己的实现BehaviorSubject您可以在文章末尾找到完成的代码。

现在,我们终于转到多播运算符。希望以上示例可以帮助您更快地理解它们。

组播运营商


组播和连接


运营商multicast() 用于Subject将源流发布给多个订户。


import { interval, Subject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new Subject())
)

const observer1 = connectableObservable.subscribe(log);
const observer2 = connectableObservable.subscribe(log);

const connectableSubscription = (connectableObservable as ConnectableObservable<any>)
  .connect();

multicast返回ConnectableObservable具有方法的对象connect其目的是为接收到的主题订阅源流。

该方法connect使我们能够确定何时开始执行原始线程。有个时刻需要牢记-取消订阅您需要做的消息:

connectableSubscription.unsubscribe();

我们不仅限于简单Subject相反,您可以使用任何派生类,例如ReplaySubject

import { interval, ReplaySubject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new ReplaySubject(1))
)

const observer1 = connectableObservable.subscribe(log);

setTimeout(() => {
  // Late subscriber
  connectableObservable.subscribe(log);
}, 3000)

const connectable = (connectableObservable as ConnectableObservable<any>).connect();

从此代码中,您可以猜测幕后情况。

使用时multicast,我们不仅可以传递Subject工厂函数,还可以传递工厂函数,该函数每次都会返回一个新函数Subject

重用已经完成的subject不能,工厂功能解决了这个问题。

interval(1000).pipe(
  multicast(() => new Subject())
)

翻新


当我们使用operator时multicast(),我们负责调用connect()以开始执行原始的可观察对象。另外,我们仍然必须监视可能的内存泄漏,并从中手动退订ConnectableSubscription

流程的自动化将避免错误并简化代码。善良的RxJS开发人员为我们考虑了这一点,并创建了一个refCount运算符。

refCount计算订阅数,当第一个出现时,它会调用connect(),即 订阅。当它减少回零时,将调用一个响应。

const source = interval(1000).pipe(
  multicast(new Subject()),
  refCount()
)
 
// refCount === 1 => source.subscribe();
const observer1 = source.subscribe(log);

// refCount === 2
const observer2 = source.subscribe(log);

setTimeout(() => {
  // refCount - 1
  observer1.unsubscribe();
  // refCount - 1
  observer2.unsubscribe();
  // refCount === 0 => source.unsubcribe();
}, 3000)

请注意,在refCount得到通常的可观察值之后,不是ConnectableObservable

发布及其选项


multicast() + Subject + refCount()这在RxJS中是相当典型的情况,开发人员已将其简化为单个运算符。

让我们看看我们有哪些选择。

  • publish() 当量 multicast(() => new Subject())
    const connectableObservable = interval(1000).pipe(
      publish()
    )
    
    connectableObservable.connect();
    

  • publishBehavior() 当量 multicast(new BehaviorSubject())
    const connectableObservable = interval(1000).pipe(
      publishBehavior(100)
    )
    
    connectableObservable.connect();
    

  • publishReplay() 当量 multicast(() => new ReplaySubject(x))
    const connectableObservable = interval(1000).pipe(
      publishReplay(3)
    )
    
    connectableObservable.connect();
    

  • publishLast() 当量 multicast(new AsyncSubject())
    const connectableObservable = interval(1000).pipe(
      take(2),
      publishLast()
    )
    
    connectableObservable.connect();
    

  • share() 当量 multicast(() => new Subject()) + refCount()
    const source = interval(1000).pipe(
      share()
    )
    

  • shareReplay(bufferSize) 这是使用的多播运算符ReplaySubject他没有内在multicast(),他的结果是可以观察到的,不是ConnectableObservable可以使用它,也可以不使用refCount它。这是两个选项:

    interval(1000).pipe(
      shareReplay({ refCount: true, bufferSize: 1 })
    )
    
    interval(1000).pipe(
      shareReplay(1)
    )
    

shareReplay{ refCount: false }通话就像在通话shareReplay(x)

在这种情况下,将不会进行引用计数。这意味着在原始流完成之前,shareReplay将对其进行订阅,而不管其本身是否具有最终订阅者。所有新订户将收到最后x个值。

shareReplay vs publishReplay + refCount


乍一看,它是shareReplay({ refCount: true, bufferSize: X })相同的publishReplay(X) + refCount() ,但这并非完全正确。

让我们看看有什么相似之处和有什么区别。

它们具有相同的行为refCount-根据订阅者的数量订阅和取消订阅原始流。当原始流完成时,它们的反应也相同-所有新订户都收到X个最后一个值。

但是,如果原始流尚未完成,则在这种情况下publishReplay(X) + refCount()-所有新订阅者都从缓冲区接收X值,然后将使用相同的值重新签名ReplaySubject
但是,如果我们使用shareReplay({ refCount: true, bufferSize: 1 })最后一个X值,则它们将无法获得它,因为在它的内部会创建一个 值,ReplaySubject并使用它来重新订阅源。

示例说明了这一点:

const source = interval(1000).pipe(
  publishReplay(1),
  refCount()
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
 
  // This subscriber will get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);

const source = interval(1000).pipe(
  shareReplay({ refCount: true, bufferSize: 1 })
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
  
  // This subscriber will NOT get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);





Angular中的实际示例


让我们看看如何在战斗条件下使用研究过的多播运营商。

我们使用分享


假设我们有一个需要原始流中的数据的组件。它可以是http请求,状态或其他任何内容。而且我们还需要数据处理,例如过滤,排序等。

@Component({
  template: `
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;

  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
  }
}

现在,我们需要另一个仅显示第一个用户的组件。如果我们按原样订阅源流,那么:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

现在我们有两个http请求,排序或过滤操作将执行两次。
我们申请share

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
      share()
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

我们已经知道他创建了一个新的Subject订阅源。当源发出时,主题会将这个值传递给它的所有订阅者。

问题已解决,当我们订阅时firstUser$-我们订阅了internal subject,而不是直接订阅了原始流。

使用ShareReplay


ShareReplay当您需要发出,缓存和重复最后的X个值时适用。一个典型的示例是执行http请求的单例服务。


@Injectable({ providedIn: 'root' })
export class BlogService {
  posts$ = this.http.get('https://jsonplaceholder.typicode.com/posts')
              .pipe(shareReplay(1));

  constructor(private http: HttpClient) {}
}

不论现在或将来有多少组件将请求数据,都将只有一个http请求,并且结果将保存在内部缓冲区中ReplaySubject
在某些情况下,由于没有订阅者,您需要取消未完成的请求,然后您需要申请refCount

完整的代码可以在这里找到

All Articles