рд╣рдо рдорд▓реНрдЯреАрдХрд╛рд╕реНрдЯ RxJS рдСрдкрд░реЗрдЯрд░реЛрдВ рдХрд╛ рдЕрдзреНрдпрдпрди рдХрд░рддреЗ рд╣реИрдВ

рдирдорд╕реНрдХрд╛рд░, рд╣реЗрдмреНрд░! рдореИрдВ рдЖрдкрдХреЛ Netanel рдмреЗрд╕рд▓ рдХреЗ рд▓реЗрдЦ "рдЕрдВрдбрд░рд╕реНрдЯреИрдВрдбрд┐рдВрдЧ рдЖрд░рдПрдХреНрд╕рдЬреЗрдПрд╕ рдорд▓реНрдЯреАрдХрд╛рд╕реНрдЯ рдСрдкрд░реЗрдЯрд░реНрд╕ рдХрд╛ рдЕрдиреБрд╡рд╛рдж " рдкреНрд░рд╕реНрддреБрдд рдХрд░рддрд╛ рд╣реВрдВ ред

рдкреНрд░рд╕рд╛рд░рдг рдпрд╛ рдорд▓реНрдЯреАрдХрд╛рд╕реНрдЯ рдСрдкрд░реЗрдЯрд░ рдЕрдХреНрд╕рд░ рдЖрд░рдПрдХреНрд╕рдЬреЗрдПрд╕ рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдЬрд╛рдирдиреЗ рдХреЗ рд▓рд┐рдП рд╕рдмрд╕реЗ рдХрдард┐рди рд╡рд┐рд╖рдп рд▓рдЧрддреЗ рд╣реИрдВред рдЗрд╕ рд▓реЗрдЦ рдореЗрдВ рдореИрдВ рд╕рдм рдХреБрдЫ рд╕реБрд▓рдн рддрд░реАрдХреЗ рд╕реЗ рд╕рдордЭрд╛рдиреЗ рдХреА рдХреЛрд╢рд┐рд╢ рдХрд░реВрдВрдЧрд╛ред

рд╣рдо рдорд▓реНрдЯреАрдХрд╛рд╕реНрдЯ рдСрдкрд░реЗрдЯрд░реЛрдВ рдХреА рдЖрдВрддрд░рд┐рдХ рд╕рдВрд░рдЪрдирд╛ рдФрд░ рдЙрдирдХреЗ рджреНрд╡рд╛рд░рд╛ рдХрд┐рдП рдЬрд╛рдиреЗ рд╡рд╛рд▓реЗ рдХрд╛рд░реНрдпреЛрдВ рдкрд░ рд╡рд┐рдЪрд╛рд░ рдХрд░реЗрдВрдЧреЗред

рдЖрд░рдПрдХреНрд╕рдЬреЗрдПрд╕ рдХреЗ рдмреБрдирд┐рдпрд╛рджреА рднрд╡рди рдмреНрд▓реЙрдХреЛрдВ рдХрд╛ рд╡рд░реНрдгрди рдХрд░рдХреЗ рд╢реБрд░реВ рдХрд░рддреЗ рд╣реИрдВред

рдирдореВрджрд╛рд░


рдЖрд░рдПрдХреНрд╕рдЬреЗрдПрд╕ рдореЗрдВ, рдЕрд╡рд▓реЛрдХрди рдпреЛрдЧреНрдп рд╡рд╕реНрддреБрдПрдВ (рдмрд╛рдж рдореЗрдВ "рдзрд╛рд░рд╛рдПрдВ" рдХреЗ рд░реВрдк рдореЗрдВ рд╕рдВрджрд░реНрднрд┐рдд) рд╢реБрд░реВ рдореЗрдВ рдардВрдбреА рд╣реЛрддреА рд╣реИрдВред рдЗрд╕рдХрд╛ рдорддрд▓рдм рд╣реИ рдХрд┐ рд╣рд░ рдмрд╛рд░ рдЬрдм рдЖрдк рдХрд┐рд╕реА рд╕реНрдЯреНрд░реАрдо рдХреА рд╕рджрд╕реНрдпрддрд╛ рд▓реЗрддреЗ рд╣реИрдВ, рддреЛ рд╕рдмреНрд╕рдХреНрд░рд┐рдкреНрд╢рди рдХрд╛ рдХреЙрд▓рдмреИрдХ рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИред

рдПрдХ рдмреЗрд╣рддрд░ рд╕рдордЭ рдХреЗ рд▓рд┐рдП, рдирд┐рдореНрдирд▓рд┐рдЦрд┐рдд рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рдмрдирд╛рдПрдВ:

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }
}

рдХрдВрд╕реНрдЯреНрд░рдХреНрдЯрд░ ObservableрдПрдХрд▓ рдкреИрд░рд╛рдореАрдЯрд░ - рд╕рджрд╕реНрдпрддрд╛ рдХреЗ рдХреЙрд▓рдмреИрдХ рдХреЛ рд╕реНрд╡реАрдХрд╛рд░ рдХрд░рддрд╛ рд╣реИ
subscriptionFnред рдЬрдм рднреА рд╣рдо рд╕реНрдЯреНрд░реАрдо ( subscribe()) рдХреЛ рд╕рдмреНрд╕рдХреНрд░рд╛рдЗрдм рдХрд░реЗрдВрдЧреЗ, рдЙрд╕реЗ рдХреЙрд▓ рдХрд┐рдпрд╛ рдЬрд╛рдПрдЧрд╛ ред

рдХрднреА-рдХрднреА рд╡реЗ рдХрд┐рд╕реА рд╕рджрд╕реНрдпрддрд╛ рдХреЛ рдХреЙрд▓рдмреИрдХ рднреА рдХрд╣рддреЗ рд╣реИрдВ producer, рдХреНрдпреЛрдВрдХрд┐ рдпрд╣ рд╕рдмреНрд╕рдХреНрд░рд╛рдЗрдмрд░ (рд╣рдорд╛рд░реЗ рдХреЛрдб рдореЗрдВ рдСрдмреНрдЬрд░реНрд╡рд░ рдСрдмреНрдЬреЗрдХреНрдЯ) рдХреЗ рд▓рд┐рдП "рдорд╛рди" рднреА рдЙрддреНрдкрдиреНрди рдХрд░рддрд╛ рд╣реИред

рд╡рд┐рдзрд┐ subscribe()рдПрдХ рдЗрдирдкреБрдЯ рд▓реЗрддреА рд╣реИ observerред рдпрд╣ рддреАрди рддрд░реАрдХреЛрдВ рдХреЗ рд╕рд╛рде рдПрдХ рд╡рд╕реНрддреБ рд╣реИ next(), error(), complete():ред рд▓рд╛рдЗрд╡ RxJS рдореЗрдВ, рдЖрдк рдСрдмреНрдЬреЗрдХреНрдЯ рдХреЗ рдмрдЬрд╛рдп рддреАрди рдлрд╝рдВрдХреНрд╢рди рдкрд╛рд╕ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВред

рд╡рд┐рдзрд┐, subscribe()рдЬрдм рдХрд╣рд╛ рдЬрд╛рддрд╛ рд╣реИ, рддреЛ рдЗрдирдкреБрдЯ рдХреЛ рдкрд╛рд╕ рдХрд░рдиреЗ рд╡рд╛рд▓реЗ рд╕рджрд╕реНрдпрддрд╛ рдлрд╝рдВрдХреНрд╢рди рдХреЛ рдХреЙрд▓ рдХрд░рддрд╛ рд╣реИ observerред

рд╣рдордиреЗ рдЕрдм рд╡рд┐рдзрд┐ рдХрд╛ рдЙрд▓реНрд▓реЗрдЦ рдирд╣реАрдВ рдХрд┐рдпрд╛unsubscribe, рд▓реЗрдХрд┐рди рдпрд╣ рдзреНрдпрд╛рди рдореЗрдВ рд░рдЦрд╛ рдЬрд╛рдирд╛ рдЪрд╛рд╣рд┐рдП рдХрд┐ рдкреНрд░рддреНрдпреЗрдХ рд╕рджрд╕реНрдпрддрд╛ рдЗрд╕реЗ рдирд╖реНрдЯ рдХрд░рдиреЗ рдХрд╛ рдПрдХ рддрд░реАрдХрд╛ рдкреНрд░рджрд╛рди рдХрд░рддреА рд╣реИред рд╕рдмрд╕реЗ рдЕрдзрд┐рдХ рдмрд╛рд░, рдПрдХ рд╕рджрд╕реНрдпрддрд╛ рдПрдХ рдлрд╝рдВрдХреНрд╢рди (рдпрд╛ рдЙрдЪрд┐рдд рд╡рд┐рдзрд┐ рдХреЗ рд╕рд╛рде рдПрдХ рд╡рд╕реНрддреБ) рд▓реМрдЯрд╛рддреА рд╣реИ, рдЬрд┐рд╕рдХреЗ рджреМрд░рд╛рди рдзрд╛рд░рд╛ рдФрд░ рдЙрд╕рдХреЗ рдЧреНрд░рд╛рд╣рдХреЛрдВ рдХреЗ рдмреАрдЪ рдХрд╛ рд╕рдВрдмрдВрдз рдирд╖реНрдЯ рд╣реЛ рдЬрд╛рддрд╛ рд╣реИред

рдпрд╣ рд╕рдм рдмрд╣реБрдд рд╕рд░рд▓ рд╣реИред рдЖрдЗрдП рдЕрдм рд╡рд╛рд╕реНрддрд╡рд┐рдХрддрд╛ рдХреЗ рдХрд░реАрдм рдЬрд╛рдПрдВред рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рдПрдХ рдзрд╛рд░рд╛ рдореЗрдВ рдПрдХ рджреЗрд╢реА XHR рдПрдкреАрдЖрдИ рд▓рдкреЗрдЯреЗрдВ


function http(url) {
  // This function will be called when we call http().subscribe()
  const subscriptionFn = observer => {
    log('Observable execution: http');
    const xhr = new XMLHttpRequest();
    xhr.addEventListener('load', () => {
      if (xhr.readyState === 4 && xhr.status === 200) {
        observer.next(JSON.parse(xhr.responseText));
        observer.complete();
      }
    });
    xhr.open('GET', url);
    xhr.send();
    
    return () => xhr.abort()
  }
  
  return new Observable(subscriptionFn);
}

рд╣рдордиреЗ рдПрдХ рдлрд╝рдВрдХреНрд╢рди рд▓рд┐рдЦрд╛ рд╣реИ httpрдЬреЛ рдПрдХ URL рдкреНрд░рд╛рдкреНрдд рдХрд░рддрд╛ рд╣реИ, рдПрдХ http рдЕрдиреБрд░реЛрдз рдХреЛ рдирд┐рд╖реНрдкрд╛рджрд┐рдд рдХрд░рддрд╛ рд╣реИ, рдФрд░ рдПрдХ рд╕реНрдЯреНрд░реАрдо рджреЗрддрд╛ рд╣реИ рдЬреЛ рдкреНрд░рд╛рдкреНрдд http рдкреНрд░рддрд┐рдХреНрд░рд┐рдпрд╛ рдкреНрд░рд╛рдкреНрдд рдХрд░рддрд╛ рд╣реИред

рдЕрдм, рд╣рдорд╛рд░реЗ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рдХреЛ рджреЗрдЦрддреЗ рд╣реБрдП, рдЖрдкрдХреЛ рдХреНрдпрд╛ рд▓рдЧрддрд╛ рд╣реИ рдХрд┐ рдЬрдм рд╣рдо рджреЛ рдмрд╛рд░ рдЗрд╕ рдзрд╛рд░рд╛ рдХреА рд╕рджрд╕реНрдпрддрд╛ рд▓реЗрдВрдЧреЗ рддреЛ рдХреНрдпрд╛ рд╣реЛрдЧрд╛?


// A small observer helper
const observer = tag => ({
  next(value) {
    console.log(`${tag}:`, value);
  }
});

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-1'));

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-2'));

рд╕рд╣реА рдврдВрдЧ рд╕реЗ, рджреЛ http рдЕрдиреБрд░реЛрдз рдирд┐рд╖реНрдкрд╛рджрд┐рдд рдХрд┐рдП рдЬрд╛рдПрдВрдЧреЗред рдпрджрд┐ рд╣рдо рдЕрд╡рд▓реЛрдХрди рдпреЛрдЧреНрдп рдХрдХреНрд╖рд╛ рдХреЗ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рдкрд░ рдлрд┐рд░ рд╕реЗ рдЧреМрд░ рдХрд░рддреЗ рд╣реИрдВ, рддреЛ рд╣рдо рджреЗрдЦреЗрдВрдЧреЗ рдХрд┐ рдРрд╕рд╛ рдХреНрдпреЛрдВ рд╣реИред рдкреНрд░рддреНрдпреЗрдХ рдЧреНрд░рд╛рд╣рдХ рдПрдХ рд╕рджрд╕реНрдпрддрд╛ рдХреЙрд▓рдмреИрдХ рдХрд╣рддрд╛ рд╣реИ, рдЬреЛ рдкреНрд░рддреНрдпреЗрдХ рдмрд╛рд░ рдПрдХ HTTP рдЕрдиреБрд░реЛрдз рдХрд░рддрд╛ рд╣реИред



рдСрдкрд░реЗрдЯрд░реНрд╕


рдПрдХ рдСрдкрд░реЗрдЯрд░ рдПрдХ рдлрд╝рдВрдХреНрд╢рди рд╣реИ рдЬреЛ рдПрдХ рдЗрдирдкреБрдЯ рдХреЗ рд░реВрдк рдореЗрдВ рдПрдХ рд╕реНрдЯреНрд░реАрдо рд▓реЗрддрд╛ рд╣реИ, рдХреЛрдИ рднреА рдХрд╛рд░реНрд░рд╡рд╛рдИ рдХрд░рддрд╛ рд╣реИ, рдФрд░ рдПрдХ рд╕реНрдЯреНрд░реАрдо рджреЗрддрд╛ рд╣реИред

рд╣рдо рдЕрдкрдирд╛ рдкрд╣рд▓рд╛ рдСрдкрд░реЗрдЯрд░ рд▓рд┐рдЦреЗрдВрдЧреЗред
function map(fn) {
  return source => {
    return new Observable(observer => {
      log('Observable execution: map');
      return source.subscribe({
        next(value) {
          observer.next(fn(value));
        }
      });
    });
  };
}

рдлрд╝рдВрдХреНрд╢рди map()рдПрдХ рдСрдкрд░реЗрдЯрд░ рджреЗрддрд╛ рд╣реИ рдЬреЛ рдореВрд▓ рд╕реНрдЯреНрд░реАрдо рдХреЛ рд╕реНрд╡реАрдХрд╛рд░ рдХрд░рддрд╛ рд╣реИ рдФрд░ рдПрдХ рд╕реНрдЯреНрд░реАрдо рджреЗрддрд╛ рд╣реИ рдЬрд┐рд╕рдореЗрдВ рдлрд╝рдВрдХреНрд╢рди рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рд╕рднреА рдкрд╛рд╕рд┐рдВрдЧ рдорд╛рди рдкрд╛рд░рд┐рдд рдХрд┐рдП рдЬрд╛рдПрдВрдЧреЗ fnред

рдЙрдиред рдЗрд╕рдХреЗ рдЕрдВрджрд░ рд╣рдореЗрд╢рд╛ рдЗрдирдкреБрдЯ рд╕реНрдЯреНрд░реАрдо рдХреА рд╕рджрд╕реНрдпрддрд╛ рд╣реЛрддреА рд╣реИред

рдЗрд╕ рдирдП рдСрдкрд░реЗрдЯрд░ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдиреЗ рд╕реЗ рдкрд╣рд▓реЗ, рд╣рдореЗрдВ рдХрд┐рд╕реА рддрд░рд╣ рдЗрд╕реЗ рд╕реНрдЯреНрд░реАрдо рдореЗрдВ рд╕рдВрд▓рдЧреНрди рдХрд░рдирд╛ рд╣реЛрдЧрд╛ред рд╣рдорд╛рд░реА рдХрдХреНрд╖рд╛ рдХрд╛ рд╡рд┐рд╕реНрддрд╛рд░ Observableрд╕реЗpipe()

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }

  pipe(...operators) {
    return operators.reduce((source, next) => next(source), this);
  }
}

рдПрдХ рд╕рд░рд▓ рд╡рд┐рдзрд┐, рдХреЛрдб рдХреА рд╕рд┐рд░реНрдл рдПрдХ рдкрдВрдХреНрддрд┐ред pipeрдпрд╣ рдСрдкрд░реЗрдЯрд░реЛрдВ рдХреА рдПрдХ рд╕рд░рдгреА рд▓реЗрддрд╛ рд╣реИ рдФрд░ рдмрджрд▓реЗ рдореЗрдВ рдЙрдиреНрд╣реЗрдВ рдХреЙрд▓ рдХрд░рддрд╛ рд╣реИ, рдкреНрд░рддреНрдпреЗрдХ рдЗрдирдкреБрдЯ рдХреЛ рдкрд┐рдЫрд▓реЗ рдПрдХ рдХреЗ рдкрд░рд┐рдгрд╛рдо рд╕реЗ рдЧреБрдЬрд░рддрд╛ рд╣реИред

рдЪрд▓реЛ рд╣рдорд╛рд░реЗ рдСрдкрд░реЗрдЯрд░ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░реЗрдВ:

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(observer('subscriber'));

рдЬрдм рдХреЙрд▓ рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИ subscribe, рддреЛ рдЖрдЙрдЯрдкреБрдЯ рд╕реНрдЯреНрд░реАрдо рдХреА рд╕рджрд╕реНрдпрддрд╛ рдирд┐рд╖реНрдкрд╛рджрд┐рдд рдХреА рдЬрд╛рдПрдЧреА map(), рдФрд░ рдмрджрд▓реЗ рдореЗрдВ map, рдореВрд▓ рд╕реНрдЯреНрд░реАрдо рдХреА рд╕рджрд╕реНрдпрддрд╛ рдХреЛ рдЕрдВрджрд░ рдирд┐рд╖реНрдкрд╛рджрд┐рдд рдХрд┐рдпрд╛ рдЬрд╛рдПрдЧрд╛ред

Http рдзрд╛рд░рд╛ рдЙрд╕ рдореВрд▓реНрдп рдХрд╛ рдЙрддреНрд╕рд░реНрдЬрди рдХрд░рддреА рд╣реИ рдЬреЛ рдЗрд╕рдореЗрдВ рдЧрд┐рд░рддрд╛ рд╣реИ mapред рдлрд┐рд░, рдлрд╝рдВрдХреНрд╢рди рдирд┐рд╖реНрдкрд╛рджрд┐рдд рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИ fn, mapрдорд╛рди рд╕реЗ рдЕрдВрддрд┐рдо рд╕рджрд╕реНрдпрддрд╛ рддрдХ рдХреА рдзрд╛рд░рд╛ рдирд┐рдХрд▓рддреА рд╣реИред рдпрд╣ observable chainрдереНрд░реЗрдб рдЪреЗрди рдХреА рддрд░рд╣ рдХрд╛рдо рдХрд░рддрд╛ рд╣реИ ред



рдпрджрд┐ рд╣рдо рджреЛ рдмрд╛рд░ рд╢реНрд░реГрдВрдЦрд▓рд╛ рдХреА рд╕рджрд╕реНрдпрддрд╛ рд▓реЗрддреЗ рд╣реИрдВ, рддреЛ рд╢реНрд░реГрдВрдЦрд▓рд╛ рдореЗрдВ рдкреНрд░рддреНрдпреЗрдХ рд╕рджрд╕реНрдпрддрд╛ рдХреЛ рджреЛ рдмрд╛рд░ рдХрд╣рд╛ рдЬрд╛рдПрдЧрд╛ред

const firstUser$ = http('https://jsonplaceholder.typicode.com/users')
    .pipe(map(res => res[0]));

firstUser$.subscribe(observer('subscriber-1'));
firstUser$.subscribe(observer('subscriber-2'));



рдФрд░ рдЕрдЧрд░ рдпрд╣ рд╡реНрдпрд╡рд╣рд╛рд░ рд╣рдореЗрдВ рд╢реЛрднрд╛ рдирд╣реАрдВ рджреЗрддрд╛ рд╣реИ? рдЕрдЧрд░ рд╣рдо рд╕рдмреНрд╕рдХреНрд░рд┐рдкреНрд╢рди рдлрд╝рдВрдХреНрд╢рди рдХреЛ рдХреЗрд╡рд▓ рдПрдХ рдмрд╛рд░ рдХреЙрд▓ рдХрд░рдирд╛ рдЪрд╛рд╣рддреЗ рд╣реИрдВ, рддреЛ рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рдХрд┐рддрдиреЗ рд╕рдмреНрд╕рдХреНрд░рд┐рдкреНрд╢рди рд╣реЛрдВрдЧреЗ?

рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рдХреНрдпрд╛ рд╣реЛрдЧрд╛ рдпрджрд┐ рд╣рдо рдПрдХ http рдЕрдиреБрд░реЛрдз рдХрд░рдирд╛ рдЪрд╛рд╣рддреЗ рд╣реИрдВ рдФрд░ рд╕рднреА рдЧреНрд░рд╛рд╣рдХреЛрдВ рдХреЗ рд▓рд┐рдП рдкрд░рд┐рдгрд╛рдо рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░реЗрдВ? рдЗрд╕ рдорд╛рдорд▓реЗ рдореЗрдВ рдЖрдкрдХреЛ рдЬрд░реВрд░рдд рд╣реИ Subjectред

рд╡рд┐рд╖рдп


Subjectрдпрд╣ рдПрдХ рдзрд╛рд░рд╛ рдФрд░ рдЧреНрд░рд╛рд╣рдХ рджреЛрдиреЛрдВ рд╣реИред рдкреНрд░рд╡рд╛рд╣ - рдХреНрдпреЛрдВрдХрд┐ рдЗрд╕рдХреА рдПрдХ рд╡рд┐рдзрд┐ рд╣реИ subscribe(), рдЧреНрд░рд╛рд╣рдХ - рдХреНрдпреЛрдВрдХрд┐ рдпрд╣ рдЧреНрд░рд╛рд╣рдХ рдЗрдВрдЯрд░рдлрд╝реЗрд╕ рдХреЛ рд▓рд╛рдЧреВ рдХрд░рддрд╛ рд╣реИ - рд╡рд┐рдзрд┐рдпрд╛рдБ next(), error(), complete().

рдЗрд╕реЗ рд▓рд┐рдЦрддреЗ рд╣реИрдВред

class Subject extends Observable {
  constructor() {
    super();
    this.observers = [];
  }

  subscribe(observer) {
    this.observers.push(observer);
  }

  next(value) {
    this.observers.forEach(observer => observer.next(value));
  }

  error(error) {
    this.observers.forEach(observer => observer.error(error));
  }

  complete() {
    this.observers.forEach(observer => observer.complete());
  }
}

SubjectрдардВрдб рдзрд╛рд░рд╛ рдФрд░ рдХрдИ рдЧреНрд░рд╛рд╣рдХреЛрдВ рдХреЗ рдмреАрдЪ рдПрдХ рдордзреНрдпрд╡рд░реНрддреА рдХреЗ рд░реВрдк рдореЗрдВ рдХрд╛рд░реНрдп рдХрд░ рд╕рдХрддрд╛ рд╣реИред

рд╣рдорд╛рд░реЗ рдЙрджрд╛рд╣рд░рдг рдХреЛ рдЗрд╕ рдкреНрд░рдХрд╛рд░ рдмрджрд▓реЗрдВ:

const subject = new Subject();
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(subject);

рдЬрдм рдмреБрд▓рд╛рдпрд╛ рдЬрд╛рддрд╛ рд╣реИ subject.subscribe(someFn), рддреЛ рдХреЗрд╡рд▓ рдПрдХ рд╕рд░рд▓ рдСрдкрд░реЗрд╢рди рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИ - рд╕рд░рдгреА рдореЗрдВ рдПрдХ subject.observersрдлрд╝рдВрдХреНрд╢рди рдЬреЛрдбрд╝рдирд╛ someFnред

рдареАрдХ рд╣реИ, рдлрд┐рд░, рдЪреВрдВрдХрд┐ рдпрд╣ SubjectрдПрдХ рдЧреНрд░рд╛рд╣рдХ рдХреЗ рд░реВрдк рдореЗрдВ рднреА рд╡реНрдпрд╡рд╣рд╛рд░ рдХрд░рддрд╛ рд╣реИ, рдЖрдк рдЗрд╕реЗ рдореВрд▓ рдзрд╛рд░рд╛ рдХреЗ рд▓рд┐рдП рд╕рджрд╕реНрдпрддрд╛ рд▓реЗ рд╕рдХрддреЗ рд╣реИрдВ, рдЕрд░реНрдерд╛рддред рдЬрдм рдореВрд▓ рдзрд╛рдЧрд╛ рдПрдХ рдореВрд▓реНрдп рдЬрд╛рд░реА рдХрд░рддрд╛ рд╣реИ, рддреЛ рдЙрд╕реЗ рдХрд╣рд╛ рдЬрд╛рддрд╛ рд╣реИ subject.next(), рдЬреЛ рдкреНрд░рддреНрдпреЗрдХ рдЧреНрд░рд╛рд╣рдХ рдХреЛ рдЗрд╕ рдореВрд▓реНрдп рдХреЗ рд╣рд╕реНрддрд╛рдВрддрд░рдг рдХреЛ рдордЬрдмреВрд░ рдХрд░рддрд╛ рд╣реИ subjectред

рдЕрдм рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рд╕рджрд╕реНрдпрддрд╛ рдХрд╛ рдореВрд▓ рдХреЙрд▓рдмреИрдХ рдПрдХ рдмрд╛рд░ рдирд┐рд╖реНрдкрд╛рджрд┐рдд рд╣реЛ рдЧрдпрд╛ рд╣реИ, рдФрд░ рдХреЗрд╡рд▓ рдПрдХ http-request рдХреЛ рдирд┐рд╖реНрдкрд╛рджрд┐рдд рдХрд┐рдпрд╛ рдЬрд╛рдПрдЧрд╛ред



рдкрд╛рд░реНрдЯреА рдХреЗ рджрд┐рд╡рдВрдЧрдд


рдпрджрд┐ рд╣рдордиреЗ рд╕рд╛рдЗрди рдЕрдк рдХрд░рдиреЗ рд╕реЗ рдкрд╣рд▓реЗ рд╣реА рдореВрд▓ рдзрд╛рд░рд╛ рдкрд░ рдХрд╛рдо рдХрд┐рдпрд╛ рддреЛ рдХреНрдпрд╛ рд╣реЛрдЧрд╛?

рдкрд┐рдЫрд▓реЗ рдЙрджрд╛рд╣рд░рдг рдореЗрдВ рдЗрд╕реЗ рджрд┐рдЦрд╛рдирд╛ рд╕рдВрднрд╡ рдирд╣реАрдВ рд╣реЛрдЧрд╛, рдЪреВрдВрдХрд┐ http рдЕрддреБрд▓реНрдпрдХрд╛рд▓рд┐рдХ рд╣реИ, рднрд▓реЗ рд╣реА рдЖрдк рдЗрд╕рдХреЗ рддреБрд░рдВрдд рдмрд╛рдж рд╕рджрд╕реНрдпрддрд╛ рд▓реЗрддреЗ рд╣реИрдВ, рдлрд┐рд░ рднреА рд╕рджрд╕реНрдпрддрд╛ рдХреЗ рдмрд╛рдж рдореВрд▓реНрдп рдЖрдПрдЧрд╛ред

рдЖрдЗрдП рдЬрд▓реНрджреА рд╕реЗ рдПрдХ рдЬрдирд░реЗрдЯрд┐рдВрдЧ рдлрдВрдХреНрд╢рди рдмрдирд╛рдПрдВ of:


function of(...values) {
  return new Observable(observer => {
    log('Observable execution: of');
    values.forEach(value => observer.next(value));
  });
}

рдорд╛рдзреНрдпрдо рд╕реЗ рдмрдирд╛рдИ рдЧрдИ рдПрдХ рдзрд╛рд░рд╛ of()рдПрдХ рдХреЗ рдмрд╛рдж рдПрдХ, рд╕рдорд╛рди рд░реВрдк рд╕реЗ рдореВрд▓реНрдпреЛрдВ рдХрд╛ рдЙрддреНрд╕рд░реНрдЬрди рдХрд░рддреА рд╣реИред рдЗрд╕рдХреЗ рд╕рджрд╕реНрдпрддрд╛ рд▓реЗрдиреЗ рдХреЗ subjectрдмрд╛рдж рд╣рдо рдЗрд╕рдХреА рд╕рджрд╕реНрдпрддрд╛ рд▓реЗрдВрдЧреЗ ред

const subject = new Subject();
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

рд╣рдорд╛рд░реЗ рдЧреНрд░рд╛рд╣рдХреЛрдВ рдХреЛ рдХреБрдЫ рдирд╣реАрдВ рдорд┐рд▓рд╛ рд╣реИред рдХреНрдпреЛрдВ? рд╣рдорд╛рд░рд╛ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди "рджреЗрд░ рд╕реЗ" рдЧреНрд░рд╛рд╣рдХреЛрдВ рдХрд╛ рд╕рдорд░реНрдерди рдирд╣реАрдВ рдХрд░рддрд╛ рд╣реИред рдЬрдм of()рдореВрд▓реНрдпреЛрдВ рд╕реЗ рдореВрд▓ рдзрд╛рд░рд╛ рдирд┐рдХрд▓рддреА рд╣реИ, рддреЛ рдЧреНрд░рд╛рд╣рдХ рдЕрднреА рддрдХ рдкрдВрдЬреАрдХреГрдд рдирд╣реАрдВ рд╣реИрдВ, рдпреЗ рдореВрд▓реНрдп рдХрд╣реАрдВ рдирд╣реАрдВ рдЬрд╛рдПрдВрдЧреЗред

рдХреЛрдгреАрдп рдкрд░ рд╡рд╛рд╕реНрддрд╡рд┐рдХ рдЙрджрд╛рд╣рд░рдгреЛрдВ рдореЗрдВ, рдпрд╣ рдЕрдЪреНрдЫреА рддрд░рд╣ рд╕реЗ рд╣реЛ рд╕рдХрддрд╛ рд╣реИ рдХрд┐ рд╕реНрд░реЛрдд рдзрд╛рд░рд╛ рдиреЗ рдХрд╛рдо рдХрд┐рдпрд╛, рд▓реЗрдХрд┐рди рдЖрдкрдХрд╛ рдШрдЯрдХ рдЕрднреА рддрдХ рдкреГрд╖реНрда рдкрд░ рдореМрдЬреВрдж рдирд╣реАрдВ рд╣реИред рдФрд░ рдЬрдм рдШрдЯрдХ рдкреНрд░рдХрдЯ рд╣реЛрддрд╛ рд╣реИ, рддреЛ рдпрд╣ рд╕реНрд░реЛрдд рдХреА рд╕рджрд╕реНрдпрддрд╛ рд▓реЗрддрд╛ рд╣реИ, рд▓реЗрдХрд┐рди рдЙрди рдорд╛рдиреЛрдВ рдХреЛ рдкреНрд░рд╛рдкреНрдд рдирд╣реАрдВ рдХрд░рддрд╛ рд╣реИ рдЬреЛ рдкрд╣рд▓реЗ рд╕реЗ рд╣реА рдкрд╛рд░рд┐рдд рд╣реЛ рдЪреБрдХреЗ рд╣реИрдВред

рд╕рдорд╕реНрдпрд╛ рдХреЛ рд╣рд▓ рдХрд░рдиреЗ рдХрд╛ рдПрдХ рддрд░реАрдХрд╛ рдпрд╣ рд╣реИ ReplaySubjectред рд╣рдо рдЗрд╕рдХреЗ рд╕рдВрд╕реНрдХрд░рдг рдХреА рд░реВрдкрд░реЗрдЦрд╛ рддреИрдпрд╛рд░ рдХрд░рддреЗ рд╣реИрдВ рдФрд░ рджреЗрдЦрддреЗ рд╣реИрдВ рдХрд┐ рдпрд╣ рдХреИрд╕реЗ рдХрд╛рдо рдХрд░рддрд╛ рд╣реИред


class ReplaySubject extends Subject {
  constructor(bufferSize) {
    super();
    this.observers = [];
    this.bufferSize = bufferSize;
    this.buffer = [];
  }

  subscribe(observer) {
    this.buffer.forEach(val => observer.next(val));
    this.observers.push(observer);
  }

  next(value) {
    if (this.buffer.length === this.bufferSize) {
      this.buffer.shift();
    }

    this.buffer.push(value);
    this.observers.forEach(observer => observer.next(value));
  }
}

рдЕрд╡рдзрд╛рд░рдгрд╛ рд╕рд░рд▓ рд╣реИред рдЬреИрд╕рд╛ рдХрд┐ рдирд╛рдо рд╕реЗ рдкрддрд╛ рдЪрд▓рддрд╛ рд╣реИ, ReplaySubjectрдпрд╣ рдПрдХ рд╡рд┐рд╢реЗрд╖ Subjectрд╣реИ рдЬреЛ рд╕рднреА рдирдП рдЧреНрд░рд╛рд╣рдХреЛрдВ рдХреЗ рд▓рд┐рдП рдкреБрд░рд╛рдиреЗ рдореВрд▓реНрдпреЛрдВ рдХреЛ рдкреБрди: рдкреЗрд╢ рдХрд░ рд╕рдХрддрд╛ рд╣реИред

рдкреНрд░рддреНрдпреЗрдХ рдЬрд╛рд░реА рдХрд┐рдП рдЧрдП рдореВрд▓реНрдп рдХреЛ рд╕рднреА рд╡рд░реНрддрдорд╛рди рдЧреНрд░рд╛рд╣рдХреЛрдВ рдХреЛ рд╣рд╕реНрддрд╛рдВрддрд░рд┐рдд рдХрд┐рдпрд╛ рдЬрд╛рдПрдЧрд╛ рдФрд░ рднрд╡рд┐рд╖реНрдп рдХреЗ рд▓реЛрдЧреЛрдВ рдХреЗ рд▓рд┐рдП рд╕рд╣реЗрдЬрд╛ рдЬрд╛рдПрдЧрд╛, рдмрдлрд░ рдХрд╛ рдЖрдХрд╛рд░ bufferSizeрдХрдВрд╕реНрдЯреНрд░рдХреНрдЯрд░ рдореЗрдВ рд╕реЗрдЯ рд╣реИред

рдкрд┐рдЫрд▓реЗ рдЙрджрд╛рд╣рд░рдг рдХреЛ рдлрд┐рд░ рд╕реЗ рд▓рд┐рдЦреЗрдВ ReplaySubjectред

const subject = new ReplaySubject(3);
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

рдкрд░рд┐рдгрд╛рдо рдмрджрд▓ рдЧрдпрд╛ рд╣реИред

рджреЗрд░ рд╕реЗ рд╕рджрд╕реНрдпрддрд╛ рдХреЗ рдмрд╛рд╡рдЬреВрдж, рд╣рдордиреЗ рдЙрди рд╕рднреА рдХреЛ рдкрдХрдбрд╝ рд▓рд┐рдпрд╛ред



рд╕рдВрдХреНрд╖реЗрдк рдореЗрдВ, рдЙрджреНрджреЗрд╢реНрдп ReplaySubjectрд╕рднреА рдЧреНрд░рд╛рд╣рдХреЛрдВ рдХреЗ рд▓рд┐рдП рдореВрд▓реНрдпреЛрдВ рдХрд╛ рд╡рд┐рддрд░рдг рдФрд░ рдЙрдиреНрд╣реЗрдВ рднрд╡рд┐рд╖реНрдп рдХреЗ "рд▓реЗрдЯ" рдЧреНрд░рд╛рд╣рдХреЛрдВ рдХреЗ рд▓рд┐рдП рдХреИрд╢рд┐рдВрдЧ рдХрд░рдирд╛ рд╣реИред

рдЖрдЧреЗ рдмрдврд╝рдиреЗ рд╕реЗ рдкрд╣рд▓реЗ, рдореИрдВ рд╕реБрдЭрд╛рд╡ рджреЗрддрд╛ рд╣реВрдВ рдХрд┐ рдЖрдк рдЕрдкрдирд╛ рд╕реНрд╡рдпрдВ рдХрд╛ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рд▓рд┐рдЦрдиреЗ рдХрд╛ рдкреНрд░рдпрд╛рд╕ рдХрд░реЗрдВ BehaviorSubjectред рдЖрдк рд▓реЗрдЦ рдХреЗ рдЕрдВрдд рдореЗрдВ рддреИрдпрд╛рд░ рдХреЛрдб рдкрд╛ рд╕рдХрддреЗ рд╣реИрдВред

рдЕрдм рд╣рдо рдЕрдВрдд рдореЗрдВ рдорд▓реНрдЯреАрдХрд╛рд╕реНрдЯ рдСрдкрд░реЗрдЯрд░реЛрдВ рдкрд░ рдЪрд▓рддреЗ рд╣реИрдВред рдореБрдЭреЗ рдЙрдореНрдореАрдж рд╣реИ рдХрд┐ рдЙрдкрд░реЛрдХреНрдд рдЙрджрд╛рд╣рд░рдг рдЖрдкрдХреЛ рддреЗрдЬреА рд╕реЗ рд╕рдордЭрдиреЗ рдореЗрдВ рдорджрдж рдХрд░реЗрдВрдЧреЗред

рдорд▓реНрдЯреАрдХрд╛рд╕реНрдЯ рдСрдкрд░реЗрдЯрд░реНрд╕


рдорд▓реНрдЯрд┐рдХрд╛рд╕реНрдЯ рдФрд░ рдХрдиреЗрдХреНрдЯ


рдСрдкрд░реЗрдЯрд░ рдХрдИ рдЧреНрд░рд╛рд╣рдХреЛрдВ рдХреЛ рд╕реНрд░реЛрдд рд╕реНрдЯреНрд░реАрдо рдЬрд╛рд░реА рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП multicast() рдЙрдкрдпреЛрдЧ рдХрд░рддрд╛ Subjectрд╣реИред


import { interval, Subject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new Subject())
)

const observer1 = connectableObservable.subscribe(log);
const observer2 = connectableObservable.subscribe(log);

const connectableSubscription = (connectableObservable as ConnectableObservable<any>)
  .connect();

multicastрдПрдХ рдРрд╕реА рд╡рд╕реНрддреБ рд▓реМрдЯрд╛рддрд╛ ConnectableObservableрд╣реИ рдЬрд┐рд╕рдореЗрдВ рдПрдХ рд╡рд┐рдзрд┐ рд╣реЛрддреА рд╣реИ connectред рдЗрд╕рдХрд╛ рдЙрджреНрджреЗрд╢реНрдп рд╕реНрд░реЛрдд рд╕реНрдЯреНрд░реАрдо рдореЗрдВ рдкреНрд░рд╛рдкреНрдд рд╡рд┐рд╖рдп рдХреА рд╕рджрд╕реНрдпрддрд╛ рд▓реЗрдирд╛ рд╣реИред

рд╡рд┐рдзрд┐ connectрд╣рдореЗрдВ рдпрд╣ рдирд┐рд░реНрдзрд╛рд░рд┐рдд рдХрд░рдиреЗ рдХреА рдЕрдиреБрдорддрд┐ рджреЗрддреА рд╣реИ рдХрд┐ рдореВрд▓ рдзрд╛рдЧреЗ рдХрд╛ рдирд┐рд╖реНрдкрд╛рджрди рдХрдм рд╢реБрд░реВ рдХрд┐рдпрд╛ рдЬрд╛рдПред рдзреНрдпрд╛рди рдореЗрдВ рд░рдЦрдиреЗ рдХреЗ рд▓рд┐рдП рдПрдХ рдХреНрд╖рдг рд╣реИ - рдЙрд╕ рд╕реНрд░реЛрдд рд╕реЗ рд╕рджрд╕реНрдпрддрд╛ рд╕рдорд╛рдкреНрдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдЬрд┐рд╕реЗ рдЖрдкрдХреЛ рдХрд░рдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИ:

connectableSubscription.unsubscribe();

рд╣рдо рд╕рд░рд▓ рддрдХ рд╕реАрдорд┐рдд рдирд╣реАрдВ рд╣реИрдВ Subjectред рдЗрд╕рдХреЗ рдмрдЬрд╛рдп, рдЖрдк рдХрд┐рд╕реА рднреА рд╡реНрдпреБрддреНрдкрдиреНрди рд╡рд░реНрдЧ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ, рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП ReplaySubject:

import { interval, ReplaySubject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new ReplaySubject(1))
)

const observer1 = connectableObservable.subscribe(log);

setTimeout(() => {
  // Late subscriber
  connectableObservable.subscribe(log);
}, 3000)

const connectable = (connectableObservable as ConnectableObservable<any>).connect();

рдЗрд╕ рдХреЛрдб рд╕реЗ рдЖрдк рдЕрдиреБрдорд╛рди рд▓рдЧрд╛ рд╕рдХрддреЗ рд╣реИрдВ рдХрд┐ рд╣реБрдб рдХреЗ рдиреАрдЪреЗ рдХреНрдпрд╛ рд╣реЛрдЧрд╛ред

рдЬрдм рд╣рдо рдЙрдкрдпреЛрдЧ рдХрд░рддреЗ рд╣реИрдВ multicast, рддреЛ рд╣рдо рди рдХреЗрд╡рд▓ рд╕реНрдерд╛рдирд╛рдВрддрд░рд┐рдд рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ Subject, рдмрд▓реНрдХрд┐ рдПрдХ рдХрд╛рд░рдЦрд╛рдирд╛ рдлрд╝рдВрдХреНрд╢рди рднреА рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ , рдЬреЛ рд╣рд░ рдмрд╛рд░ рдПрдХ рдирдпрд╛ рд░рд┐рдЯрд░реНрди рджреЗрддрд╛ рд╣реИ Subjectред

рдкрд╣рд▓реЗ рд╕реЗ рд╣реА рдкреВрд░рд╛ рдХрд┐рдпрд╛ рд╣реБрдЖ рдкреБрди: рдЙрдкрдпреЛрдЧ subjectрдирд╣реАрдВ рдХрд┐рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ, рдлреИрдХреНрдЯрд░реА рдлрд╝рдВрдХреНрд╢рди рдЗрд╕ рд╕рдорд╕реНрдпрд╛ рдХреЛ рд╣рд▓ рдХрд░рддрд╛ рд╣реИред

interval(1000).pipe(
  multicast(() => new Subject())
)

Refount


рдЬрдм рд╣рдо рдСрдкрд░реЗрдЯрд░ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рддреЗ рд╣реИрдВ multicast(), рддреЛ рд╣рдо connect()рдореВрд▓ рдЕрд╡рд▓реЛрдХрди рдХреЗ рдирд┐рд╖реНрдкрд╛рджрди рдХреЛ рд╢реБрд░реВ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдХреЙрд▓ рдХреЗ рд▓рд┐рдП рдЬрд┐рдореНрдореЗрджрд╛рд░ рд╣реЛрддреЗ рд╣реИрдВ ред рдЗрд╕рдХреЗ рдЕрд▓рд╛рд╡рд╛, рд╣рдо рдЕрднреА рднреА рд╕рдВрднрд╡ рд╕реНрдореГрддрд┐ рд▓реАрдХ рдХреЗ рд▓рд┐рдП рдирд┐рдЧрд░рд╛рдиреА рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рд╣реИ, рдореИрдиреНрдпреБрдЕрд▓ рд░реВрдк рд╕реЗ рд╕рджрд╕реНрдпрддрд╛ рд╕рдорд╛рдкреНрдд ConnectableSubscriptionред

рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдХрд╛ рд╕реНрд╡рдЪрд╛рд▓рди рддреНрд░реБрдЯрд┐рдпреЛрдВ рд╕реЗ рдмрдЪрдиреЗ рдФрд░ рдХреЛрдб рдХреЛ рд╕рд░рд▓ рдХрд░реЗрдЧрд╛ред RxJS рдбреЗрд╡рд▓рдкрд░реНрд╕ рдиреЗ рдЬрд┐рд╕ рддрд░рд╣ рд╕реЗ рд╣рдорд╛рд░реЗ рдмрд╛рд░реЗ рдореЗрдВ рд╕реЛрдЪрд╛ рдФрд░ рдПрдХ refCountрдСрдкрд░реЗрдЯрд░ рдмрдирд╛рдпрд╛ ред

refCountрд╕рджрд╕реНрдпрддрд╛ рдХреА рдЧрдгрдирд╛ рдХрд░рддрд╛ рд╣реИ рдФрд░ рдЬрдм рдкрд╣рд▓реА рдмрд╛рд░ рдкреНрд░рдХрдЯ рд╣реЛрддрд╛ рд╣реИ, рддреЛ рдпрд╣ рдХреЙрд▓ рдХрд░рддрд╛ рд╣реИ connect(), рдЕрд░реНрдерд╛рддред рд╕рджрд╕реНрдпрддрд╛ рд▓реЗрддрд╛ рд╣реИред рдЬрдм рдпрд╣ рд╢реВрдиреНрдп рдкрд░ рд╡рд╛рдкрд╕ рдШрдЯрддрд╛ рд╣реИ, рддреЛ рдПрдХ рдкреНрд░рддрд┐рдХреНрд░рд┐рдпрд╛ рдХрд╣рд╛ рдЬрд╛рдПрдЧрд╛ред

const source = interval(1000).pipe(
  multicast(new Subject()),
  refCount()
)
 
// refCount === 1 => source.subscribe();
const observer1 = source.subscribe(log);

// refCount === 2
const observer2 = source.subscribe(log);

setTimeout(() => {
  // refCount - 1
  observer1.unsubscribe();
  // refCount - 1
  observer2.unsubscribe();
  // refCount === 0 => source.unsubcribe();
}, 3000)

рдзреНрдпрд╛рди рджреЗрдВ рдХрд┐ refCountрд╣рдо рд╕рд╛рдорд╛рдиреНрдп рд░реВрдк рд╕реЗ рдЕрд╡рд▓реЛрдХрди рдХрд░рдиреЗ рдХреЗ рдмрд╛рдж рдирд╣реАрдВ ConnectableObservableред

рдкреНрд░рдХрд╛рд╢рд┐рдд рдХрд░реЗрдВ рдФрд░ рдЗрд╕рдХреЗ рд╡рд┐рдХрд▓реНрдк


multicast() + Subject + refCount()рдпрд╣ рдЖрд░рдПрдХреНрд╕рдЬреЗрдПрд╕ рдореЗрдВ рдПрдХ рдХрд╛рдлреА рд╡рд┐рд╢рд┐рд╖реНрдЯ рдорд╛рдорд▓рд╛ рд╣реИ рдФрд░ рдбреЗрд╡рд▓рдкрд░реНрд╕ рдиреЗ рдЗрд╕реЗ рдПрдХ рдСрдкрд░реЗрдЯрд░ рдХреЗ рд▓рд┐рдП рдХрдо рдХрд░ рджрд┐рдпрд╛ рд╣реИред

рдЖрдЗрдП рджреЗрдЦреЗрдВ рдХрд┐ рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рдХреНрдпрд╛ рд╡рд┐рдХрд▓реНрдк рд╣реИрдВред

  • publish() рдмрд░рд╛рдмрд░ multicast(() => new Subject())
    const connectableObservable = interval(1000).pipe(
      publish()
    )
    
    connectableObservable.connect();
    

  • publishBehavior() рдмрд░рд╛рдмрд░ multicast(new BehaviorSubject())
    const connectableObservable = interval(1000).pipe(
      publishBehavior(100)
    )
    
    connectableObservable.connect();
    

  • publishReplay() рдмрд░рд╛рдмрд░ multicast(() => new ReplaySubject(x))
    const connectableObservable = interval(1000).pipe(
      publishReplay(3)
    )
    
    connectableObservable.connect();
    

  • publishLast() рдмрд░рд╛рдмрд░ multicast(new AsyncSubject())
    const connectableObservable = interval(1000).pipe(
      take(2),
      publishLast()
    )
    
    connectableObservable.connect();
    

  • share() рдмрд░рд╛рдмрд░ multicast(() => new Subject()) + refCount()
    const source = interval(1000).pipe(
      share()
    )
    

  • shareReplay(bufferSize) рдпрд╣ рдПрдХ рдорд▓реНрдЯреАрдХрд╛рд╕реНрдЯ рдСрдкрд░реЗрдЯрд░ рд╣реИ рдЬреЛ рдЙрдкрдпреЛрдЧ рдХрд░рддрд╛ рд╣реИ ReplaySubjectред рдЙрд╕рдХрд╛ рдХреЛрдИ рдЕрдВрджрд░ рдирд╣реАрдВ рд╣реИ multicast()рдФрд░ рдЙрд╕рдХрд╛ рдкрд░рд┐рдгрд╛рдо рдЕрд╡рд▓реЛрдХрдиреАрдп рд╣реИ, рдирд╣реАрдВ ConnectableObservableред рдЗрд╕реЗ рдЗрд╕рдХреЗ рд╕рд╛рде refCountрдпрд╛ рдмрд┐рдирд╛ рдЙрдкрдпреЛрдЧ рдХрд┐рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИред рдпрд╣рд╛рдБ рджреЛрдиреЛрдВ рд╡рд┐рдХрд▓реНрдк рд╣реИрдВ:

    interval(1000).pipe(
      shareReplay({ refCount: true, bufferSize: 1 })
    )
    
    interval(1000).pipe(
      shareReplay(1)
    )
    

рдЬрдм shareReplayрдХреЙрд▓ рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИ рддреЛ рдХреЙрд▓ рдХрд░рдиреЗ { refCount: false }рдЬреИрд╕рд╛ рд╣реЛрддрд╛ рд╣реИ shareReplay(x)ред

рдЗрд╕ рдорд╛рдорд▓реЗ рдореЗрдВ, рдХреЛрдИ рд╕рдВрджрд░реНрдн рдЧрд┐рдирддреА рдирд╣реАрдВ рд╣реЛрдЧреАред рдЗрд╕рдХрд╛ рдорддрд▓рдм рдпрд╣ рд╣реИ рдХрд┐ рдЬрдм рддрдХ рдореВрд▓ рдзрд╛рд░рд╛ рдкреВрд░реА рдирд╣реАрдВ рд╣реЛ рдЬрд╛рддреА рд╣реИ, рддрдм рддрдХ рдЗрд╕реЗ shareReplayрд╕рдмреНрд╕рдХреНрд░рд╛рдЗрдм рдХрд┐рдпрд╛ рдЬрд╛рдПрдЧрд╛, рднрд▓реЗ рд╣реА рдЗрд╕рдХреЗ рдЕрдВрддрд┐рдо рдЧреНрд░рд╛рд╣рдХ рд╣реЛрдВ рдпрд╛ рди рд╣реЛрдВред рд╕рднреА рдирдП рдЧреНрд░рд╛рд╣рдХреЛрдВ рдХреЛ рдЕрдВрддрд┐рдо x рдорд╛рди рдкреНрд░рд╛рдкреНрдд рд╣реЛрдЧрд╛ред

shareReplay рдмрдирд╛рдо publishReplay + refCount


рдкрд╣рд▓реА рдирдЬрд╝рд░ рдореЗрдВ , рдпрд╣ shareReplay({ refCount: true, bufferSize: X })рд╕рдорд╛рди рд╣реИ publishReplay(X) + refCount() , рд▓реЗрдХрд┐рди рдпрд╣ рдкреВрд░реА рддрд░рд╣ рд╕реЗ рд╕рдЪ рдирд╣реАрдВ рд╣реИред

рдЖрдЗрдП рджреЗрдЦреЗрдВ рдХрд┐ рдХреНрдпрд╛ рд╕рдорд╛рдирддрд╛рдПрдВ рд╣реИрдВ рдФрд░ рдХреНрдпрд╛ рдЕрдВрддрд░ рд╣реИред

рдЙрдирдХреЗ рдкрд╛рд╕ рдПрдХ рд╣реА рд╡реНрдпрд╡рд╣рд╛рд░ рд╣реИ refCount- рдЧреНрд░рд╛рд╣рдХреЛрдВ рдХреА рд╕рдВрдЦреНрдпрд╛ рдХреЗ рдЖрдзрд╛рд░ рдкрд░ рдореВрд▓ рдзрд╛рд░рд╛ рд╕реЗ рд╕рджрд╕реНрдпрддрд╛ рдФрд░ рд╕рджрд╕реНрдпрддрд╛ рд╕рдорд╛рдкреНрдд рдХрд░рдирд╛ред рдореВрд▓ рдзрд╛рд░рд╛ рдкреВрд░реА рд╣реЛрдиреЗ рдкрд░ рд╡реЗ рднреА рдЙрд╕реА рддрд░рд╣ рдХреА рдкреНрд░рддрд┐рдХреНрд░рд┐рдпрд╛ рдХрд░рддреЗ рд╣реИрдВ - рд╕рднреА рдирдП рдЧреНрд░рд╛рд╣рдХ X рдЕрдВрддрд┐рдо рдорд╛рди рдкреНрд░рд╛рдкреНрдд рдХрд░рддреЗ рд╣реИрдВред

рд╣рд╛рд▓рд╛рдБрдХрд┐, рдЕрдЧрд░ рдореВрд▓ рдзрд╛рд░рд╛ рдХреЛ рдЕрднреА рдЕрдВрддрд┐рдо рд░реВрдк рдирд╣реАрдВ рджрд┐рдпрд╛ рдЧрдпрд╛ рд╣реИ, рдЗрд╕ рдорд╛рдорд▓реЗ рдореЗрдВ рдЬрдм рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рд╣реИ publishReplay(X) + refCount()- рд╕рднреА рдирдП рдЧреНрд░рд╛рд╣рдХреЛрдВ рдХреЛ рдмрдлрд░ рд╕реЗ рдПрдХреНрд╕ рдорд╛рди рдкреНрд░рд╛рдкреНрдд рд╣реЛрддрд╛ рд╣реИ, рдФрд░ рдлрд┐рд░ рдЙрд╕реА рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдлрд┐рд░ рд╕реЗ рд╣рд╕реНрддрд╛рдХреНрд╖рд░ рдХрд┐рдП рдЬрд╛рдПрдВрдЧреЗ ReplaySubjectред
рд▓реЗрдХрд┐рди рдЕрдЧрд░ рд╣рдо shareReplay({ refCount: true, bufferSize: 1 })рдЕрдВрддрд┐рдо X рдорд╛рдиреЛрдВ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рддреЗ рд╣реИрдВ, рддреЛ рд╡реЗ рдЗрд╕реЗ рдкреНрд░рд╛рдкреНрдд рдирд╣реАрдВ рдХрд░реЗрдВрдЧреЗ, рдХреНрдпреЛрдВрдХрд┐ рдЗрд╕рдХреЗ рдЕрдВрджрд░ рдпрд╣ рдПрдХ рдирдпрд╛ рдмрдирд╛рддрд╛ рд╣реИ ReplaySubjectрдФрд░ рдЗрд╕рдХрд╛ рдЙрдкрдпреЛрдЧ рд╕реНрд░реЛрдд рдХреЛ рдлрд┐рд░ рд╕реЗ рд╕рджрд╕реНрдпрддрд╛ рд▓реЗрдиреЗ рдХреЗ рд▓рд┐рдП рдХрд░рддрд╛ рд╣реИред

рдЗрд╕реЗ рджрд░реНрд╢рд╛рдиреЗ рд╡рд╛рд▓реЗ рдЙрджрд╛рд╣рд░рдг:

const source = interval(1000).pipe(
  publishReplay(1),
  refCount()
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
 
  // This subscriber will get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);

const source = interval(1000).pipe(
  shareReplay({ refCount: true, bufferSize: 1 })
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
  
  // This subscriber will NOT get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);





рдПрдВрдЧреБрд▓рд░ рдореЗрдВ рд╡рд╛рд╕реНрддрд╡рд┐рдХ рдЙрджрд╛рд╣рд░рдг


рдЖрдЗрдП рджреЗрдЦреЗрдВ рдХрд┐ рдореБрдХрд╛рдмрд▓рд╛ рд╕реНрдерд┐рддрд┐рдпреЛрдВ рдореЗрдВ рдЕрдзреНрдпрдпрди рдХрд┐рдП рдЧрдП рдорд▓реНрдЯреАрдХрд╛рд╕реНрдЯ рдСрдкрд░реЗрдЯрд░реЛрдВ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХреИрд╕реЗ рдХрд░реЗрдВред

рд╣рдо рд╢реЗрдпрд░ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рддреЗ рд╣реИрдВ


рдорд╛рди рд▓реАрдЬрд┐рдП рдХрд┐ рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рдПрдХ рдШрдЯрдХ рд╣реИ рдЬрд┐рд╕реЗ рдореВрд▓ рдзрд╛рд░рд╛ рд╕реЗ рдбреЗрдЯрд╛ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИред рдпрд╣ рдПрдХ http рдЕрдиреБрд░реЛрдз, рдПрдХ рд░рд╛рдЬреНрдп, рдпрд╛ рдЬреЛ рдХреБрдЫ рднреА рд╣реЛ рд╕рдХрддрд╛ рд╣реИред рдФрд░ рд╣рдореЗрдВ рдбреЗрдЯрд╛ рд╣реЗрд░рдлреЗрд░ рдХреА рднреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИ, рдЬреИрд╕реЗ рдлрд╝рд┐рд▓реНрдЯрд░рд┐рдВрдЧ, рд╕реЙрд░реНрдЯрд┐рдВрдЧ рдЖрджрд┐ред

@Component({
  template: `
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;

  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
  }
}

рдФрд░ рдЕрдм рд╣рдореЗрдВ рдПрдХ рдФрд░ рдШрдЯрдХ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИ рдЬреЛ рдХреЗрд╡рд▓ рдкрд╣рд▓реЗ рдЙрдкрдпреЛрдЧрдХрд░реНрддрд╛ рдХреЛ рджрд┐рдЦрд╛рддрд╛ рд╣реИред рдпрджрд┐ рд╣рдо рд╕реНрд░реЛрдд рдзрд╛рд░рд╛ рдХреА рд╕рджрд╕реНрдпрддрд╛ рд▓реЗрддреЗ рд╣реИрдВ, рддреЛ рдпрд╣ рд╣реИ:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

рдФрд░ рдЕрдм рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рджреЛ http рдЕрдиреБрд░реЛрдз рд╣реИрдВ, рджреЛ рдмрд╛рд░ рд╕реЙрд░реНрдЯрд┐рдВрдЧ рдпрд╛ рдлрд╝рд┐рд▓реНрдЯрд░рд┐рдВрдЧ рдСрдкрд░реЗрд╢рди рдХрд┐рдП рдЬрд╛рдПрдВрдЧреЗред
рд╣рдо рд▓рд╛рдЧреВ рд╣реЛрддреЗ рд╣реИрдВ share:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
      share()
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

рд╣рдо рдкрд╣рд▓реЗ рд╕реЗ рд╣реА рдЬрд╛рдирддреЗ рд╣реИрдВ рдХрд┐ рд╡рд╣ рдПрдХ рдирдпрд╛ рдмрдирд╛рддрд╛ Subjectрд╣реИ рдЬреЛ рд╕реНрд░реЛрдд рдХреА рд╕рджрд╕реНрдпрддрд╛ рд▓реЗрддрд╛ рд╣реИред рдЬрдм рд╕реНрд░реЛрдд рдирд┐рдХрд▓рддрд╛ рд╣реИ, рддреЛ рд╡рд┐рд╖рдп рдЗрд╕ рдореВрд▓реНрдп рдХреЛ рдЕрдкрдиреЗ рд╕рднреА рдЧреНрд░рд╛рд╣рдХреЛрдВ рдХреЗ рдкрд╛рд╕ рднреЗрдЬ рджреЗрддрд╛ рд╣реИред

рд╕рдорд╕реНрдпрд╛ рд╣рд▓ рд╣реЛ рдЧрдИ рд╣реИ, рдФрд░ рдЬрдм рд╣рдордиреЗ рд╕рджрд╕реНрдпрддрд╛ рд▓реА рд╣реИ firstUser$- рд╣рдордиреЗ рдЖрдВрддрд░рд┐рдХ рдХреА рд╕рджрд╕реНрдпрддрд╛ рд▓реА рд╣реИ subject, рдФрд░ рд╕реАрдзреЗ рдореВрд▓ рдзрд╛рд░рд╛ рдирд╣реАрдВред

ShareReplay рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдирд╛


ShareReplayрд▓рд╛рдЧреВ рд╣реЛрддрд╛ рд╣реИ рдЬрдм рдЖрдкрдХреЛ рдЕрдВрддрд┐рдо X рдорд╛рдиреЛрдВ рдХрд╛ рдЙрддреНрд╕рд░реНрдЬрди, рдХреИрд╢ рдФрд░ рджреЛрд╣рд░рд╛рдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реЛрддреА рд╣реИред рдПрдХ рд╡рд┐рд╢рд┐рд╖реНрдЯ рдЙрджрд╛рд╣рд░рдг рдПрдХ рд╕рд┐рдВрдЧрд▓рдЯрди рд╕реЗрд╡рд╛ рд╣реИ рдЬреЛ http рдЕрдиреБрд░реЛрдз рдХрд░рддрд╛ рд╣реИред


@Injectable({ providedIn: 'root' })
export class BlogService {
  posts$ = this.http.get('https://jsonplaceholder.typicode.com/posts')
              .pipe(shareReplay(1));

  constructor(private http: HttpClient) {}
}

рдЗрд╕рд╕реЗ рдХреЛрдИ рдлрд░реНрдХ рдирд╣реАрдВ рдкрдбрд╝рддрд╛ рдХрд┐ рдХрд┐рддрдиреЗ рдШрдЯрдХ рдЕрднреА рдпрд╛ рднрд╡рд┐рд╖реНрдп рдореЗрдВ рдбреЗрдЯрд╛ рдХрд╛ рдЕрдиреБрд░реЛрдз рдХрд░реЗрдВрдЧреЗ, рдХреЗрд╡рд▓ рдПрдХ http рдЕрдиреБрд░реЛрдз рд╣реЛрдЧрд╛ рдФрд░ рдкрд░рд┐рдгрд╛рдо рдЖрдВрддрд░рд┐рдХ рдмрдлрд░ рдореЗрдВ рд╕рд╣реЗрдЬрд╛ рдЬрд╛рдПрдЧрд╛ ReplaySubjectред
рдЕрднреА рднреА рдПрдХ рдорд╛рдорд▓рд╛ рд╣реЛ рд╕рдХрддрд╛ рд╣реИ рдЬрд╣рд╛рдВ рдЖрдкрдХреЛ рдЕрдкреВрд░реНрдг рдЕрдиреБрд░реЛрдз рдХреЛ рд░рджреНрдж рдХрд░рдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИ, рдХреНрдпреЛрдВрдХрд┐ рдХреЛрдИ рдЧреНрд░рд╛рд╣рдХ рдирд╣реАрдВ рд╣реИрдВ, рддреЛ рдЖрдкрдХреЛ рдЖрд╡реЗрджрди рдХрд░рдирд╛ рд╣реЛрдЧрд╛ refCountред

рдкреВрд░рд╛ рдХреЛрдб рдпрд╣рд╛рдВ рдкрд╛рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ ред

All Articles