نحن ندرس مشغلي RxJS المتعدد البث

مرحبا يا هابر! أقدم لكم ترجمة المقال "فهم مشغلي الإرسال المتعدد RxJS" لنتانيل بصل.

غالبًا ما يبدو مشغلو البث أو الإرسال المتعدد أصعب موضوع يمكن تعلمه عن RxJS. في هذه المقالة سأحاول شرح كل شيء بطريقة يمكن الوصول إليها.

سننظر في الهيكل الداخلي لمشغلي الإرسال المتعدد والمهام التي يقومون بحلها.

لنبدأ بوصف كتل البناء الأساسية لـ RxJS.

ملحوظة


في RxJS ، تكون الأجسام التي يمكن ملاحظتها (يشار إليها فيما يلي باسم "الجداول") في البداية باردة. هذا يعني أنه في كل مرة تشترك فيها في دفق ، يتم إجراء رد على الاشتراك.

لفهم أفضل ، قم بإنشاء التنفيذ التالي:

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }
}

Observableيقبل المنشئ معلمة واحدة - رد الاشتراك
subscriptionFn. سيتم استدعاؤه في كل مرة اشترك فيها في الدفق ( subscribe()).

في بعض الأحيان يسمونه أيضًا رد اتصال لاشتراك producer، لأنه "ينتج" أيضًا قيمًا للمشترك (كائن مراقب في الكود الخاص بنا). تأخذ

الطريقة subscribe()مدخلات observer. ومن وجوه مع ثلاث طرق الخاصة: next(), error(), complete(). في RxJS الحية ، يمكنك تمرير ثلاث وظائف بدلاً من كائن. تستدعي

الطريقة ، subscribe()عند استدعاء ، وظيفة الاشتراك لتمريرها إلى الإدخال observer.

لم نذكر الطريقة الآنunsubscribe، ولكن يجب أن يوضع في الاعتبار أن كل اشتراك يوفر طريقة لتدميره. غالبًا ما يُرجع الاشتراك دالة (أو كائنًا باستخدام الطريقة المناسبة) ، يتم خلالها تدمير الاتصال بين الدفق والمشتركين فيه.

هذا كله بسيط جدا دعنا نقترب من الواقع الآن. على سبيل المثال ، لف واجهة برمجة تطبيقات XHR أصلية في دفق


function http(url) {
  // This function will be called when we call http().subscribe()
  const subscriptionFn = observer => {
    log('Observable execution: http');
    const xhr = new XMLHttpRequest();
    xhr.addEventListener('load', () => {
      if (xhr.readyState === 4 && xhr.status === 200) {
        observer.next(JSON.parse(xhr.responseText));
        observer.complete();
      }
    });
    xhr.open('GET', url);
    xhr.send();
    
    return () => xhr.abort()
  }
  
  return new Observable(subscriptionFn);
}

كتبنا دالة httpتتلقى عنوان URL ، وتنفذ طلب http ، وترجع دفقًا ينبعث منه رد http المتلقى.

الآن ، بالنظر إلى تطبيقنا ، ماذا تعتقد أنه سيحدث عندما نشترك في هذا التدفق مرتين؟


// A small observer helper
const observer = tag => ({
  next(value) {
    console.log(`${tag}:`, value);
  }
});

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-1'));

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-2'));

بشكل صحيح ، سيتم تنفيذ طلبين http. إذا نظرنا مرة أخرى في تنفيذ فئة الملاحظة ، فسوف نرى سبب ذلك. يقوم كل مشترك باستدعاء رد اتصال للاشتراك ، والذي بدوره يقوم بطلب http في كل مرة.



العاملين


العامل هو وظيفة تأخذ الدفق كمدخل ، وتقوم بأي إجراء ، وترجع الدفق.

سنكتب أول مشغل لدينا.
function map(fn) {
  return source => {
    return new Observable(observer => {
      log('Observable execution: map');
      return source.subscribe({
        next(value) {
          observer.next(fn(value));
        }
      });
    });
  };
}

تقوم الدالة map()بإرجاع عامل تشغيل يقبل الدفق الأصلي وإرجاع دفق حيث سيتم تمرير كافة قيم التمرير عبر الدالة fn.

أولئك. بداخله يوجد دائمًا اشتراك في دفق الإدخال.

قبل استخدام هذا المشغل الجديد ، نحتاج إلى إرفاقه بطريقة أو بأخرى بالدفق. توسيع الصف لدينا Observableمن قبلpipe()

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }

  pipe(...operators) {
    return operators.reduce((source, next) => next(source), this);
  }
}

طريقة بسيطة ، سطر واحد فقط من التعليمات البرمجية. pipeيستغرق الأمر مجموعة من العوامل ويدعوها بدورها ، لتمرير كل إدخال نتيجة السابقة.

دعونا نستخدم عامل التشغيل لدينا:

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(observer('subscriber'));

عند الاستدعاء subscribe، سيتم تنفيذ الاشتراك في دفق الإخراج map()، وفي المقابل map، سيتم تنفيذ الاشتراك في الدفق الأصلي في الداخل .

ينبثق دفق http القيمة التي يقع فيها map. ثم يتم تنفيذ الوظيفة fn، حيث mapينبعث الدفق من القيمة إلى الاشتراك النهائي. يعمل مثل observable chainسلسلة خيط.



إذا اشتركنا في سلسلة مرتين ، فسيتم استدعاء كل اشتراك في السلسلة مرتين.

const firstUser$ = http('https://jsonplaceholder.typicode.com/users')
    .pipe(map(res => res[0]));

firstUser$.subscribe(observer('subscriber-1'));
firstUser$.subscribe(observer('subscriber-2'));



وإذا كان هذا السلوك لا يناسبنا؟ إذا أردنا استدعاء وظيفة الاشتراك مرة واحدة فقط ، فكم عدد الاشتراكات التي سنحصل عليها؟

على سبيل المثال ، ماذا لو أردنا تقديم طلب http واحد واستخدام النتيجة لجميع المشتركين؟ في هذه الحالة تحتاج Subject.

المواضيع


Subjectهو دفق ومشترك. التدفق - لأنه يحتوي على طريقة subscribe()، المشترك - لأنه يطبق واجهة المشترك - طرق next(), error(), complete().

دعنا نكتبها.

class Subject extends Observable {
  constructor() {
    super();
    this.observers = [];
  }

  subscribe(observer) {
    this.observers.push(observer);
  }

  next(value) {
    this.observers.forEach(observer => observer.next(value));
  }

  error(error) {
    this.observers.forEach(observer => observer.error(error));
  }

  complete() {
    this.observers.forEach(observer => observer.complete());
  }
}

Subjectيمكن أن تعمل كوسيط بين التيار البارد والعديد من المشتركين.

قم بتغيير مثالنا على النحو التالي:

const subject = new Subject();
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(subject);

عند الاستدعاء subject.subscribe(someFn)، يتم تنفيذ عملية واحدة بسيطة - إضافة subject.observersدالة إلى الصفيف someFn.

حسنًا ، نظرًا لأنه يعمل Subjectكمشترك أيضًا ، يمكنك الاشتراك في ساحة المشاركات الأصلية ، أي عندما يصدر الخيط الأصلي قيمة ، يطلق عليه subject.next()، مما يستتبع نقل هذه القيمة إلى كل مشترك subject.

الآن لدينا تنفيذ الاستدعاء الأصلي للاشتراك مرة واحدة ، وسيتم تنفيذ طلب http واحد فقط.



المتأخرون في الحفلات


ماذا يحدث إذا كان البث الأصلي يعمل بالفعل قبل الاشتراك؟

لن يكون من الممكن إظهار ذلك في المثال السابق ، نظرًا لأن http غير متزامن ، حتى إذا قمت بالاشتراك فيه فورًا ، فستظل القيمة تأتي بعد الاشتراك.

دعونا ننشئ بسرعة وظيفة توليد of:


function of(...values) {
  return new Observable(observer => {
    log('Observable execution: of');
    values.forEach(value => observer.next(value));
  });
}

الدفق الذي تم إنشاؤه عن طريق of()بعث القيم بشكل متزامن ، واحدا تلو الآخر. سنقوم بالاشتراك subjectبعد الاشتراك في بالفعل.

const subject = new Subject();
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

لم يتلق المشتركون لدينا أي شيء. لماذا ا؟ تطبيقنا لا يدعم المشتركين "المتأخرين". عندما يكون الدفق الأصلي من of()قيم الانبعاثات ، لم يتم تسجيل المشتركين حتى الآن ، لن تذهب هذه القيم إلى أي مكان.

في الأمثلة الفعلية على Angular ، قد يكون دفق المصدر ناجحًا ، ولكن المكون الخاص بك لم يظهر بعد على الصفحة. وعندما يظهر المكون ، فإنه يشترك في المصدر ، لكنه لا يتلقى القيم التي مرت بالفعل.

طريقة واحدة لحل المشكلة هي هذه ReplaySubject. نوجز نسخته ونرى كيف يعمل.


class ReplaySubject extends Subject {
  constructor(bufferSize) {
    super();
    this.observers = [];
    this.bufferSize = bufferSize;
    this.buffer = [];
  }

  subscribe(observer) {
    this.buffer.forEach(val => observer.next(val));
    this.observers.push(observer);
  }

  next(value) {
    if (this.buffer.length === this.bufferSize) {
      this.buffer.shift();
    }

    this.buffer.push(value);
    this.observers.forEach(observer => observer.next(value));
  }
}

المفهوم بسيط. كما يوحي الاسم ، ReplaySubjectهذه Subjectواحدة خاصة يمكنها إعادة إنتاج القيم القديمة لجميع المشتركين الجدد.

سيتم نقل كل قيمة تم إصدارها إلى جميع المشتركين الحاليين وحفظها للمستقبلين ، ويتم bufferSizeتعيين حجم المخزن المؤقت في المُنشئ.

أعد كتابة المثال السابق ب ReplaySubject.

const subject = new ReplaySubject(3);
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

لقد تغيرت النتيجة.

على الرغم من الاشتراك المتأخر ، أمسكنا بهم جميعًا.



باختصار ، الغرض ReplaySubjectهو توزيع القيم على جميع المشتركين وتخزينها مؤقتًا للمشتركين "المتأخرين" في المستقبل.

قبل المضي قدمًا ، أوصيك بمحاولة كتابة التنفيذ الخاص بك BehaviorSubject. يمكنك العثور على الرمز النهائي في نهاية المقالة.

الآن ننتقل أخيرًا إلى مشغلي الإرسال المتعدد. آمل أن تساعدك الأمثلة المذكورة أعلاه على فهمها بشكل أسرع.

مشغلي الإرسال المتعدد


الإرسال المتعدد والاتصال


multicast() يستخدم عامل التشغيل Subjectلإصدار دفق المصدر إلى العديد من المشتركين.


import { interval, Subject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new Subject())
)

const observer1 = connectableObservable.subscribe(log);
const observer2 = connectableObservable.subscribe(log);

const connectableSubscription = (connectableObservable as ConnectableObservable<any>)
  .connect();

multicastإرجاع كائن ConnectableObservableله أسلوب connect. والغرض منه هو الاشتراك في الموضوع المستلم في دفق المصدر. تتيح لنا

الطريقة connectتحديد موعد بدء تنفيذ سلسلة المحادثات الأصلية. هناك لحظة يجب وضعها في الاعتبار - لإلغاء الاشتراك من المصدر الذي عليك القيام به:

connectableSubscription.unsubscribe();

نحن لا نقتصر على البساطة Subject. بدلاً من ذلك ، يمكنك استخدام أي فئة مشتقة ، على سبيل المثال ReplaySubject:

import { interval, ReplaySubject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new ReplaySubject(1))
)

const observer1 = connectableObservable.subscribe(log);

setTimeout(() => {
  // Late subscriber
  connectableObservable.subscribe(log);
}, 3000)

const connectable = (connectableObservable as ConnectableObservable<any>).connect();

من هذا الكود يمكنك تخمين ما سيحدث تحت غطاء المحرك.

عندما نستخدمها multicast، يمكننا نقل ليس فقط Subjectوظيفة المصنع ، والتي ترجع وظيفة جديدة في كل مرة Subject. لا يمكن

إعادة استخدامها المكتملة بالفعل subject، وظيفة المصنع تحل هذه المشكلة.

interval(1000).pipe(
  multicast(() => new Subject())
)

إعادة تحميل


عندما نستخدم عامل التشغيل multicast()، نحن مسؤولون عن المكالمة connect()لبدء تنفيذ الملاحظة الأصلية. بالإضافة إلى ذلك ، لا يزال يتعين علينا مراقبة التسرب المحتمل للذاكرة ، وإلغاء الاشتراك يدويًا ConnectableSubscription.

إن أتمتة العملية ستتجنب الأخطاء وتبسط الكود. فكر مطورو RxJS اللطيف في ذلك بالنسبة لنا وأنشأوا refCountعامل تشغيل.

refCountيحسب الاشتراكات وعندما يظهر الأول ، فإنه يتصل connect()، أي يشترك. عندما ينخفض ​​إلى الصفر ، سيتم استدعاء استجابة.

const source = interval(1000).pipe(
  multicast(new Subject()),
  refCount()
)
 
// refCount === 1 => source.subscribe();
const observer1 = source.subscribe(log);

// refCount === 2
const observer2 = source.subscribe(log);

setTimeout(() => {
  // refCount - 1
  observer1.unsubscribe();
  // refCount - 1
  observer2.unsubscribe();
  // refCount === 0 => source.unsubcribe();
}, 3000)

لاحظ أنه بعد أن refCountنحصل على المعتاد ، لا ConnectableObservable.

نشر وخياراته


multicast() + Subject + refCount()هذه حالة نموذجية إلى حد ما في RxJS وقام المطورون بتقليصها إلى مشغل واحد.

دعونا نرى ما الخيارات المتاحة لدينا.

  • publish() ما يعادل multicast(() => new Subject())
    const connectableObservable = interval(1000).pipe(
      publish()
    )
    
    connectableObservable.connect();
    

  • publishBehavior() ما يعادل multicast(new BehaviorSubject())
    const connectableObservable = interval(1000).pipe(
      publishBehavior(100)
    )
    
    connectableObservable.connect();
    

  • publishReplay() ما يعادل multicast(() => new ReplaySubject(x))
    const connectableObservable = interval(1000).pipe(
      publishReplay(3)
    )
    
    connectableObservable.connect();
    

  • publishLast() ما يعادل multicast(new AsyncSubject())
    const connectableObservable = interval(1000).pipe(
      take(2),
      publishLast()
    )
    
    connectableObservable.connect();
    

  • share() ما يعادل multicast(() => new Subject()) + refCount()
    const source = interval(1000).pipe(
      share()
    )
    

  • shareReplay(bufferSize) هذا عامل تشغيل متعدد الإرسال يستخدم ReplaySubject. ليس لديه داخل multicast()ونتائجه ملحوظة ، لا ConnectableObservable. يمكن استخدامه معها refCountأو بدونها. إليك كلا الخيارين:

    interval(1000).pipe(
      shareReplay({ refCount: true, bufferSize: 1 })
    )
    
    interval(1000).pipe(
      shareReplay(1)
    )
    

عند الاتصال shareReplayبـ { refCount: false }يشبه الاتصال shareReplay(x).

في هذه الحالة ، لن يكون هناك حساب مرجع. هذا يعني أنه حتى اكتمال الدفق الأصلي ، shareReplayسيتم الاشتراك فيه ، بغض النظر عما إذا كان لديه نفسه المشتركون النهائيون أم لا. سيتلقى جميع المشتركين الجدد آخر قيم س.

ShareReplay مقابل PublishReplay + refCount


للوهلة الأولى ، إنها shareReplay({ refCount: true, bufferSize: X })متطابقة publishReplay(X) + refCount() ، لكن هذا ليس صحيحًا تمامًا.

دعونا نرى ما هي أوجه التشابه وما هو الفرق.

لديهم نفس السلوك refCount- الاشتراك وإلغاء الاشتراك من الدفق الأصلي بناءً على عدد المشتركين. كما يتفاعلون أيضًا عند اكتمال الدفق الأصلي - يتلقى جميع المشتركين الجدد قيم X الأخيرة.

ومع ذلك ، إذا لم يتم الانتهاء من الدفق الأصلي ، في هذه الحالة عندما يكون لدينا publishReplay(X) + refCount()- يتلقى جميع المشتركين الجدد قيم X من المخزن المؤقت ، ثم سيتم إعادة توقيعهم باستخدام نفس القيمة ReplaySubject.
ولكن إذا استخدمنا shareReplay({ refCount: true, bufferSize: 1 })قيم X الأخيرة ، فلن يحصلوا عليها ، لأن داخلها يخلق قيمة جديدة ReplaySubject ويستخدمها لإعادة الاشتراك في المصدر.

أمثلة توضح هذا:

const source = interval(1000).pipe(
  publishReplay(1),
  refCount()
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
 
  // This subscriber will get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);

const source = interval(1000).pipe(
  shareReplay({ refCount: true, bufferSize: 1 })
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
  
  // This subscriber will NOT get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);





أمثلة فعلية في الزاوي


دعونا نرى كيفية استخدام عوامل البث المتعدد المدروسة في ظروف القتال.

نستخدم حصة


افترض أن لدينا مكونًا يحتاج إلى بيانات من الدفق الأصلي. يمكن أن يكون طلب http أو دولة أو أيا كان. ونحتاج أيضًا إلى معالجة البيانات ، مثل التصفية والفرز وما إلى ذلك.

@Component({
  template: `
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;

  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
  }
}

والآن نحتاج إلى مكون آخر يُظهر المستخدم الأول فقط. إذا اشتركنا في دفق المصدر كما هو ، فعندئذٍ:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

والآن لدينا طلبان http ، سيتم تنفيذ عمليات الفرز أو التصفية مرتين.
نطبق share:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
      share()
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

نحن نعلم بالفعل أنه ينشئ Subjectواحدة جديدة تشترك في المصدر. عندما ينبعث المصدر ، يمرر الموضوع هذه القيمة إلى جميع المشتركين فيه.

يتم حل المشكلة ، وعندما اشتركنا firstUser$- اشتركنا subjectفي الدفق الداخلي ، وليس الدفق الأصلي مباشرة.

باستخدام ShareReplay


ShareReplayيتم تطبيقه عندما تحتاج إلى إصدار قيم X الأخيرة وتخزينها مؤقتًا وتكرارها. المثال النموذجي هو خدمة فردية تنفذ طلب http.


@Injectable({ providedIn: 'root' })
export class BlogService {
  posts$ = this.http.get('https://jsonplaceholder.typicode.com/posts')
              .pipe(shareReplay(1));

  constructor(private http: HttpClient) {}
}

لا يهم عدد المكونات التي ستطلب البيانات الآن أو في المستقبل ، سيكون هناك طلب http واحد فقط وسيتم حفظ النتيجة في المخزن المؤقت الداخلي ReplaySubject.
ربما لا تزال هناك حالة تحتاج فيها إلى إلغاء طلب غير مكتمل ، نظرًا لعدم وجود مشتركين ، فستحتاج إلى تقديم طلب refCount.

يمكن العثور على الرمز الكامل هنا .

All Articles