Kami mempelajari operator multicast RxJS

Halo, Habr! Saya mempersembahkan untuk Anda terjemahan artikel "Memahami Operator Multicast RxJS" oleh Netanel Basal.

Operator siaran atau multicast sering tampak seperti topik tersulit untuk dipelajari tentang RxJS. Pada artikel ini saya akan mencoba menjelaskan semuanya dengan cara yang dapat diakses.

Kami akan mempertimbangkan struktur internal operator multicast dan tugas yang mereka selesaikan.

Mari kita mulai dengan menggambarkan blok bangunan dasar RxJS.

Tampak


Dalam RxJS, objek yang dapat diamati (selanjutnya disebut sebagai "aliran") pada awalnya dingin. Ini berarti bahwa setiap kali Anda berlangganan aliran, panggilan balik berlangganan dilakukan.

Untuk pemahaman yang lebih baik, buat implementasi berikut:

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }
}

Konstruktor Observablemenerima satu parameter - panggilan balik langganan
subscriptionFn. Ini akan dipanggil setiap kali kita berlangganan aliran ( subscribe()).

Kadang-kadang mereka juga memanggil panggilan balik berlangganan producer, karena juga "menghasilkan" nilai untuk pelanggan (objek pengamat dalam kode kami).

Metode ini subscribe()mengambil input observer. Ini adalah obyek dengan tiga metode sendiri: next(), error(), complete(). Dalam live RxJS, Anda dapat melewati tiga fungsi, bukan objek.

Metode ini, subscribe()ketika dipanggil, memanggil fungsi berlangganan untuk meneruskannya ke input observer.

Kami tidak menyebutkan metodenya sekarangunsubscribe, tetapi harus diingat bahwa setiap langganan menyediakan cara untuk menghancurkannya. Paling sering, langganan mengembalikan fungsi (atau objek dengan metode yang sesuai), selama koneksi antara aliran dan pelanggannya dihancurkan.

Ini semua sangat sederhana. Mari kita bergerak lebih dekat dengan kenyataan sekarang. Misalnya, bungkus API XHR asli dalam aliran


function http(url) {
  // This function will be called when we call http().subscribe()
  const subscriptionFn = observer => {
    log('Observable execution: http');
    const xhr = new XMLHttpRequest();
    xhr.addEventListener('load', () => {
      if (xhr.readyState === 4 && xhr.status === 200) {
        observer.next(JSON.parse(xhr.responseText));
        observer.complete();
      }
    });
    xhr.open('GET', url);
    xhr.send();
    
    return () => xhr.abort()
  }
  
  return new Observable(subscriptionFn);
}

Kami menulis fungsi httpyang menerima URL, mengeksekusi permintaan http, dan mengembalikan aliran yang memancarkan respons http yang diterima.

Sekarang, melihat implementasi kami, menurut Anda apa yang akan terjadi ketika kami berlangganan aliran ini dua kali?


// A small observer helper
const observer = tag => ({
  next(value) {
    console.log(`${tag}:`, value);
  }
});

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-1'));

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-2'));

Dengan benar, dua permintaan http akan dieksekusi. Jika kita melihat lagi implementasi dari kelas yang Dapat Diobservasi, kita akan melihat mengapa demikian. Setiap pelanggan panggilan balik berlangganan, yang pada gilirannya melakukan permintaan http setiap kali.



Operator


Operator adalah fungsi yang mengambil aliran sebagai input, melakukan tindakan apa pun, dan mengembalikan aliran.

Kami akan menulis operator pertama kami sendiri.
function map(fn) {
  return source => {
    return new Observable(observer => {
      log('Observable execution: map');
      return source.subscribe({
        next(value) {
          observer.next(fn(value));
        }
      });
    });
  };
}

Fungsi map()mengembalikan operator yang menerima aliran asli dan mengembalikan aliran di mana semua nilai yang lewat akan melewati fungsi fn.

Itu di dalamnya selalu ada langganan ke input stream.

Sebelum menggunakan operator baru ini, kita harus melampirkannya entah bagaimana ke aliran. Perpanjang kelas kami Observabledenganpipe()

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }

  pipe(...operators) {
    return operators.reduce((source, next) => next(source), this);
  }
}

Metode sederhana, hanya satu baris kode. pipeDibutuhkan array operator dan memanggil mereka pada gilirannya, melewati setiap input hasil dari yang sebelumnya.

Mari kita gunakan operator kami:

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(observer('subscriber'));

Saat dipanggil subscribe, berlangganan ke aliran output akan dieksekusi map(), dan pada gilirannya map, berlangganan ke aliran asli akan dieksekusi di dalam .

Aliran http memancarkan nilai yang jatuh ke dalamnya map. Kemudian, fungsi dieksekusi fn, aliran dari mapmemancarkan nilai ke langganan akhir. Ini bekerja seperti observable chainrantai utas.



Jika kami berlangganan sebuah rantai dua kali, setiap langganan dalam rantai itu akan dipanggil dua kali.

const firstUser$ = http('https://jsonplaceholder.typicode.com/users')
    .pipe(map(res => res[0]));

firstUser$.subscribe(observer('subscriber-1'));
firstUser$.subscribe(observer('subscriber-2'));



Dan apakah perilaku ini tidak cocok untuk kita? Jika kami ingin memanggil fungsi berlangganan hanya sekali, berapa banyak langganan yang akan kami miliki?

Misalnya, bagaimana jika kita ingin membuat satu permintaan http dan menggunakan hasilnya untuk semua pelanggan? Dalam hal ini yang Anda butuhkan Subject.

Subjek


Subjectitu adalah aliran dan pelanggan. Aliran - karena memiliki metode subscribe(), pelanggan - karena menerapkan antarmuka pelanggan - metode next(), error(), complete().

Mari kita menulisnya.

class Subject extends Observable {
  constructor() {
    super();
    this.observers = [];
  }

  subscribe(observer) {
    this.observers.push(observer);
  }

  next(value) {
    this.observers.forEach(observer => observer.next(value));
  }

  error(error) {
    this.observers.forEach(observer => observer.error(error));
  }

  complete() {
    this.observers.forEach(observer => observer.complete());
  }
}

Subjectdapat bertindak sebagai perantara antara arus dingin dan banyak pelanggan.

Ubah contoh kami sebagai berikut:

const subject = new Subject();
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(subject);

Saat dipanggil subject.subscribe(someFn), hanya satu operasi sederhana yang dilakukan - menambahkan subject.observersfungsi ke array someFn.

Nah, kemudian, karena Subjectberperilaku sebagai pelanggan juga, Anda dapat berlangganan ke aliran asli, mis. ketika utas asli mengeluarkan nilai, itu disebut subject.next(), yang mensyaratkan transfer nilai ini ke masing-masing pelanggan subject.

Sekarang kami memiliki panggilan balik asli dari langganan yang dieksekusi satu kali, dan hanya satu permintaan http yang akan dieksekusi.



Pesta terlambat


Apa yang terjadi jika aliran asli sudah berfungsi sebelum kami mendaftar?

Ini tidak akan mungkin untuk menunjukkan ini dalam contoh sebelumnya, karena http asinkron, bahkan jika Anda berlangganan segera setelah itu, nilainya akan tetap datang setelah berlangganan.

Mari cepat membuat fungsi menghasilkan of:


function of(...values) {
  return new Observable(observer => {
    log('Observable execution: of');
    values.forEach(value => observer.next(value));
  });
}

Aliran yang dibuat dengan cara of()memancarkan nilai secara sinkron, satu demi satu. Kami akan berlangganan subjectsetelah berlangganan dari.

const subject = new Subject();
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

Pelanggan kami belum menerima apa pun. Mengapa? Implementasi kami tidak mendukung pelanggan "terlambat". Ketika aliran asli dari of()nilai yang dipancarkan, pelanggan belum terdaftar, nilai-nilai ini tidak akan ke mana-mana.

Dalam contoh aktual pada Angular, mungkin sumber arus berfungsi, tetapi komponen Anda belum ada di halaman. Dan ketika komponen muncul, ia berlangganan ke sumbernya, tetapi tidak menerima nilai-nilai yang telah berlalu.

Salah satu cara untuk menyelesaikan masalah adalah ini ReplaySubject. Kami menguraikan versinya dan melihat cara kerjanya.


class ReplaySubject extends Subject {
  constructor(bufferSize) {
    super();
    this.observers = [];
    this.bufferSize = bufferSize;
    this.buffer = [];
  }

  subscribe(observer) {
    this.buffer.forEach(val => observer.next(val));
    this.observers.push(observer);
  }

  next(value) {
    if (this.buffer.length === this.bufferSize) {
      this.buffer.shift();
    }

    this.buffer.push(value);
    this.observers.forEach(observer => observer.next(value));
  }
}

Konsepnya sederhana. Seperti namanya, ReplaySubjectini adalah yang khusus Subjectyang dapat mereproduksi nilai lama untuk semua pelanggan baru.

Setiap nilai yang dirilis akan ditransfer ke semua pelanggan saat ini dan disimpan untuk yang akan datang, ukuran buffer bufferSizeditetapkan dalam konstruktor.

Tulis ulang contoh sebelumnya dengan ReplaySubject.

const subject = new ReplaySubject(3);
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

Hasilnya telah berubah.

Meskipun terlambat berlangganan, kami menangkap semuanya.



Ringkasnya, tujuannya ReplaySubjectadalah distribusi nilai untuk semua pelanggan dan menyimpannya untuk pelanggan "terlambat" di masa mendatang.

Sebelum melanjutkan, saya sarankan Anda mencoba menulis implementasi Anda sendiri BehaviorSubject. Anda dapat menemukan kode selesai di akhir artikel.

Sekarang kami akhirnya beralih ke operator multicast. Saya harap contoh di atas akan membantu Anda memahaminya lebih cepat.

Operator multicast


Multicast dan Connect


Operator multicast() menggunakan Subjectuntuk mengeluarkan aliran sumber ke beberapa pelanggan.


import { interval, Subject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new Subject())
)

const observer1 = connectableObservable.subscribe(log);
const observer2 = connectableObservable.subscribe(log);

const connectableSubscription = (connectableObservable as ConnectableObservable<any>)
  .connect();

multicastmengembalikan objek ConnectableObservableyang memiliki metode connect. Tujuannya adalah untuk berlangganan subjek yang diterima ke aliran sumber.

Metode ini connectmemungkinkan kami untuk menentukan kapan harus memulai pelaksanaan utas asli. Ada saat yang perlu diingat - untuk berhenti berlangganan dari sumber yang perlu Anda lakukan:

connectableSubscription.unsubscribe();

Kami tidak terbatas pada yang sederhana Subject. Sebagai gantinya, Anda bisa menggunakan kelas turunan apa pun, misalnya ReplaySubject:

import { interval, ReplaySubject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new ReplaySubject(1))
)

const observer1 = connectableObservable.subscribe(log);

setTimeout(() => {
  // Late subscriber
  connectableObservable.subscribe(log);
}, 3000)

const connectable = (connectableObservable as ConnectableObservable<any>).connect();

Dari kode ini Anda bisa menebak apa yang akan terjadi di bawah tenda.

Ketika kami menggunakan multicast, kami dapat mentransfer tidak hanya Subject, tetapi juga fungsi pabrik, yang mengembalikan yang baru setiap kali Subject.

Digunakan kembali sudah selesai subjecttidak bisa, fungsi pabrik memecahkan masalah ini.

interval(1000).pipe(
  multicast(() => new Subject())
)

Refount


Saat kami menggunakan operator multicast(), kami bertanggung jawab atas panggilan connect()untuk memulai eksekusi yang asli yang dapat diamati. Selain itu, kami masih harus memantau kemungkinan kebocoran memori, berhenti berlangganan secara manual ConnectableSubscription.

Otomatisasi proses akan menghindari kesalahan dan menyederhanakan kode. Pengembang RxJS yang baik memikirkannya untuk kami dan menciptakan refCountoperator.

refCountmenghitung langganan dan ketika yang pertama muncul, ia memanggil connect(), mis. berlangganan. Ketika berkurang kembali ke nol, respons akan dipanggil.

const source = interval(1000).pipe(
  multicast(new Subject()),
  refCount()
)
 
// refCount === 1 => source.subscribe();
const observer1 = source.subscribe(log);

// refCount === 2
const observer2 = source.subscribe(log);

setTimeout(() => {
  // refCount - 1
  observer1.unsubscribe();
  // refCount - 1
  observer2.unsubscribe();
  // refCount === 0 => source.unsubcribe();
}, 3000)

Perhatikan bahwa setelah refCountkita mendapatkan yang biasa diamati, tidak ConnectableObservable.

Publikasikan dan variasinya


multicast() + Subject + refCount()ini adalah kasus yang cukup khas di RxJS dan pengembang telah menguranginya menjadi satu operator.

Mari kita lihat opsi apa yang kita miliki.

  • publish() setara multicast(() => new Subject())
    const connectableObservable = interval(1000).pipe(
      publish()
    )
    
    connectableObservable.connect();
    

  • publishBehavior() setara multicast(new BehaviorSubject())
    const connectableObservable = interval(1000).pipe(
      publishBehavior(100)
    )
    
    connectableObservable.connect();
    

  • publishReplay() setara multicast(() => new ReplaySubject(x))
    const connectableObservable = interval(1000).pipe(
      publishReplay(3)
    )
    
    connectableObservable.connect();
    

  • publishLast() setara multicast(new AsyncSubject())
    const connectableObservable = interval(1000).pipe(
      take(2),
      publishLast()
    )
    
    connectableObservable.connect();
    

  • share() setara multicast(() => new Subject()) + refCount()
    const source = interval(1000).pipe(
      share()
    )
    

  • shareReplay(bufferSize) Ini adalah operator multicast yang menggunakan ReplaySubject. Dia tidak memiliki bagian dalam multicast()dan hasilnya dapat diamati, bukan ConnectableObservable. Itu dapat digunakan dengan refCountatau tanpa itu. Berikut adalah dua opsi:

    interval(1000).pipe(
      shareReplay({ refCount: true, bufferSize: 1 })
    )
    
    interval(1000).pipe(
      shareReplay(1)
    )
    

Saat shareReplaydipanggil dengan { refCount: false }seperti menelepon shareReplay(x).

Dalam hal ini, tidak akan ada penghitungan referensi. Ini berarti bahwa sampai aliran asli selesai, ia shareReplayakan berlangganan kepadanya, terlepas dari apakah itu sendiri memiliki pelanggan akhir atau tidak. Semua pelanggan baru akan menerima nilai x terakhir.

shareReplay vs publishReplay + refCount


Sepintas , itu shareReplay({ refCount: true, bufferSize: X })identik publishReplay(X) + refCount() , tetapi ini tidak sepenuhnya benar.

Mari kita lihat apa persamaannya dan apa bedanya.

Mereka memiliki perilaku yang sama refCount- berlangganan dan berhenti berlangganan dari aliran asli berdasarkan jumlah pelanggan. Mereka juga bereaksi sama ketika aliran asli selesai - semua pelanggan baru menerima nilai X terakhir.

Namun, jika aliran asli belum selesai, dalam hal ini ketika kita memiliki publishReplay(X) + refCount()- semua pelanggan baru menerima nilai X dari buffer, dan kemudian akan ditandatangani kembali menggunakan yang sama ReplaySubject.
Tetapi jika kita menggunakan nilai shareReplay({ refCount: true, bufferSize: 1 })X terakhir, mereka tidak akan mendapatkannya, karena di dalamnya menciptakan yang baru ReplaySubject dan menggunakannya untuk berlangganan kembali ke sumbernya.

Contoh yang menggambarkan ini:

const source = interval(1000).pipe(
  publishReplay(1),
  refCount()
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
 
  // This subscriber will get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);

const source = interval(1000).pipe(
  shareReplay({ refCount: true, bufferSize: 1 })
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
  
  // This subscriber will NOT get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);





Contoh aktual dalam Angular


Mari kita lihat bagaimana menggunakan operator multicast yang dipelajari dalam kondisi pertempuran.

Kami menggunakan share


Misalkan kita memiliki komponen yang membutuhkan data dari aliran asli. Ini bisa berupa permintaan http, status, atau apa pun. Dan kita juga membutuhkan manipulasi data, seperti pemfilteran, pengurutan, dll.

@Component({
  template: `
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;

  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
  }
}

Dan sekarang kita membutuhkan komponen lain yang hanya menunjukkan pengguna pertama. Jika kita berlangganan ke sumber arus apa adanya, maka:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

Dan sekarang kami memiliki dua permintaan http, operasi penyortiran atau penyaringan akan dilakukan dua kali.
Kami menerapkan share:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
      share()
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

Kita sudah tahu bahwa dia membuat yang baru Subjectyang berlangganan ke sumbernya. Saat sumber memancarkan, subjek meneruskan nilai ini ke semua pelanggannya.

Masalahnya teratasi, dan ketika kami berlangganan firstUser$- kami berlangganan internal subject, dan bukan aliran asli secara langsung.

Menggunakan ShareReplay


ShareReplayberlaku ketika Anda perlu memancarkan, cache dan ulangi nilai X terakhir. Contoh khas adalah layanan tunggal yang melakukan permintaan http.


@Injectable({ providedIn: 'root' })
export class BlogService {
  posts$ = this.http.get('https://jsonplaceholder.typicode.com/posts')
              .pipe(shareReplay(1));

  constructor(private http: HttpClient) {}
}

Tidak masalah berapa banyak komponen yang akan meminta data sekarang atau di masa depan, hanya akan ada satu permintaan http dan hasilnya akan disimpan dalam buffer internal ReplaySubject.
Mungkin masih ada kasus di mana Anda perlu membatalkan permintaan yang tidak lengkap, karena tidak ada pelanggan, maka Anda harus mendaftar refCount.

Kode lengkap dapat ditemukan di sini .

All Articles