Interactive upload of files to the server using RxJS



It has been a long time since I wrote my last article on the basics of RxJS. In the comments, I was asked to show more complex examples that may be useful in practice. So I decided to dilute the theory a bit and today we’ll talk about uploading files.

What do we do?

  • We will write a small page where the user can select a file to upload it to the server
  • Add a progress bar so that the file upload progress is displayed.
  • Add the ability to cancel the download by clicking on the cancel button

To understand this article, you will need basic RxJS knowledge. What are Observable , operators , as well as HOO operators

We will not pull the cat by the tail and immediately get down to business!

Training


First, we need a server that can accept file download requests. Any server can be suitable for this, I will use node.js in conjunction with express and multer for the article :

const express = require("express");
const multer  = require("multer");

const app = express();
const upload = multer({ dest:"files" });

app.post("/upload", upload.single("file"), function (req, res) {
    const { file } = req;

    if (file) {
        res.send("File uploaded successfully");
    } else {
        res.error("Error");
    }

});

app.listen(3000);

Now create an html page on which we will place all the necessary elements:

<!doctype html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport"
          content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>File uploading</title>
</head>
<body>
    <label for="file">load file: <input id="file" type="file"></label>
    <button id="upload">Upload</button>
    <button id="cancel">Cancel</button>
    <div id="progress-bar" style="width: 0; height: 2rem; background-color: aquamarine;"></div>
    <script src="index.js" type="text/javascript"></script>
</body>
</html>

Now on the page we have 4 elements with which the user will interact:

  • Input of type file so that the user can select the file to upload
  • Upload button, when clicked, we will begin uploading
  • Cancel button that will cancel the download
  • Progress bar with a starting width of 0. During the upload process, we will change its width

At the very end of the body tag, I added a link to the index.js script, which we will also need to create:

//   ,     
const input = document.querySelector('#file');
const uploadBtn = document.querySelector('#upload');
const progressBar = document.querySelector('#progress-bar');
const cancelBtn = document.querySelector('#cancel');

It should look something like this:



Everything is a flow


To tell the browser which file to select, the user must click on the “Choose file” button. After that, the dialog box of your operating system will open, where the folder tree will be displayed. After selecting a file, the browser will download all the necessary information about it.

How do we understand that a user has selected a file? There is a “change" event for this. After the event is triggered, we can turn to the files array in the input, where the file data will be written.

How do we listen to the change event? You can use the addEventListener method and work with it. But we work with RxJS, where any event can be represented as a stream:

fromEvent(input, 'change').pipe(
    //    
    map(() => input.files[0])
).subscribe({
    next: data => console.log(file)
});

Add the upload function, which will upload the file to the server. For now, leave her body empty:

function upload(file) {
  console.log(file);
}

The upload function should be called after clicking the uploadBtn button:

fromEvent(uploadBtn, 'click').subscribe({
  next: () => upload(input.files[0])
});

Merge Streams


Now our code is no different from what we would write using addEventListener. Yes, it works, but if we leave it like that, then we will lose the advantages that RxJS offers us.

What we can do? We will describe the sequence of steps for uploading a file:

  • File selection
  • Click uploadBtn
  • Extract file from input.files
  • File upload

Now we transfer this sequence to the code. But how to combine input and uploadBtn streams? The switchMap operator will help us with this, which allows us to project one thread onto another:

fromEvent(input, 'change').pipe(
    switchMap(() => fromEvent(uploadBtn, 'click')),
    map(() => input.files[0])
).subscribe({
    next: file => upload(file)
});

This code is very similar to the sequence of instructions that we described above. The user selects a file, switchMap is triggered, and we subscribe to uploadBtn. But then nothing will happen.

switchMap allows only the values ​​generated by fromEvent (uploadBtn, 'click') to be passed to the external stream . To start uploading files, you need to execute the second instruction, namely, click uploadBtn. Then it will work out the map method, which will extract the file from the array, and the upload method will be called already in subscribe.

The most interesting thing here is that the sequence of instructions is not broken. For the upload function to work, the 'change' event must fire before.

But still, one problem remained. The user can select a file and then deselect it. And then when we try to upload the file, we pass in the upload function - undefined. To avoid this situation, we should add a check:

fromEvent(input, 'change').pipe(
    switchMap(() => fromEvent(uploadBtn, 'click')),
    map(() => input.files[0]),
    filter(file => !!file)
).subscribe({
    next: file => upload(file)
});

Working with xhr


It's time to implement the most difficult - the unloading process. I will show it on an example of working with xhr, since fetch, at the time of writing, does not know how to track upload progress .
You can implement unloading using any other library, for example axios or jQuery.ajax.


Since I use multer on the server side, I will have to transfer the file inside the form (multer accepts data only in this format). For this, I wrote the createFormData function:

function createFormData(file) {
    const form = new FormData();
    //       file
    form.append('file', file);
    return form;
}

fromEvent(input, 'change').pipe(
    switchMap(() => fromEvent(uploadBtn, 'click')),
    map(() => input.files[0]),
    filter(file => !!file),
    map(file => createFormData(file))
).subscribe({
    next: data => upload(data)
});

We will unload the form through XMLHttpRequest. We need to create an instance of this object and define unload and onerror methods on it. The first will fire when the upload is complete, the second when the error occurs.

function upload(data) {
    const xhr = new XMLHttpRequest();

    //        
    xhr.onload = () => console.log('success');

    //    
    xhr.onerror = e => console.error(e);

    //  
    xhr.open('POST', '/upload', true);

    //  
    xhr.send(data);
}

Now we have a working example. But it contains a couple of disadvantages:

  • There is no way to cancel the download
  • If you click on the uploadBtn button n times, then we will have n parallel connections to upload one file

This is because the upload function works out of stream. She lives on her own. We need to fix it. Let's make the function return the Observable to us. Then we can control the upload of files:

function upload(data) {
    return new Observable(observer => {
        const xhr = new XMLHttpRequest();

        //    ,      
        //   
        xhr.onload = () => {
            observer.next();
            observer.complete();
        };
        
        xhr.onerror = e => observer.error(e);
        
        xhr.open('POST', '/upload', true);
        xhr.send(data);

        //   -  
        return () => xhr.abort();
    });
}

Note the arrow function returned inside the Observable. This method will be called at the time of unsubscribing and cancel the upload.

Put the upload call in switchMap:

fromEvent(input, 'change').pipe(
    switchMap(() => fromEvent(uploadBtn, 'click')),
    map(() => input.files[0]),
    filter(file => !!file),
    map(file => createFormData(file)),
    switchMap(data => upload(data))
).subscribe({
    next: () => console.log('File uploaded')
});

Now, if the user clicks on the upload button again, the previous request will be canceled, but a new one will be created.

Cancel a click request


We still have the calcelBtn button. We must implement the cancellation of the request. The takeUntil operator will help here.

takeUntil translates to “take bye”. This operator takes the values ​​from the external stream and gives them further down the chain. As long as the internal thread exists and generates nothing. As soon as the internal thread generates a value, takeUntil will call the unsubscribe method and unsubscribe from the external thread.

Before adding an operator, we need to determine which stream we want to unsubscribe from. We are interested in upload, since it is only necessary to complete the upload of the file, i.e. Unsubscribe from internal thread:

fromEvent(input, 'change').pipe(
    switchMap(() => fromEvent(uploadBtn, 'click')),
    map(() => input.files[0]),
    filter(file => !!file),
    map(file => createFormData(file)),
    switchMap(data => upload(data).pipe(
        //    upload
        takeUntil(fromEvent(cancelBtn, 'click'))
    ))
).subscribe({
    next: () => console.log('File uploaded')
});

Progress bar


It remains to add the progress bar. To track progress, we need to define the xhr.upload.onprogress method. This method is called when the ProgressEvent event occurs. The event object contains several properties that are useful to us:

  • lengthComputable - if true, then we know the full file size (in our case, always true)
  • total - total number of bytes
  • loaded - the number of bytes sent

Make changes to the upload function:

function upload(data) {
    return new Observable(observer => {
        const xhr = new XMLHttpRequest();

        xhr.upload.onprogress = e => {
            //  
            const progress = e.loaded / e.total * 100;
            observer.next(progress);
        };

        xhr.onerror = e => observer.error(e);
        xhr.onload = () => observer.complete();

        xhr.open('POST', '/upload', true);
        xhr.send(data);

        return () => xhr.abort();
    });
}

Now upload spits out the upload status into the stream. It remains only to write a function that will change the style properties of the progressBar element:

function setProgressBarWidth(width) {
    progressBar.style.width = `${width}%`;
}

fromEvent(input, 'change').pipe(
   /* ..
   
   */
).subscribe({
    next: width => setProgressBarWidth(width)
});


A quick tip: so that your files aren’t uploaded locally so fast, turn on the “Fast 3G” or “Slow 3G” setting in the “Performance” tab of Chrome devtools.

Bring to mind


We got a full working application. It remains to add a couple of strokes. Now, when you click on the uploadBtn button, we cancel the previous upload and start a new one. But we already have a cancel button.

I want the uploadBtn button not to respond to subsequent clicks until we upload the file (or until we cancel the upload). What can be done?

You can hang the disable attribute until the upload process completes. But there is another option - the exhaustMap operator. This statement will ignore new values ​​from the external thread until the internal thread is completed. Replace switchMap with exhaustMap:

exhaustMap(data => upload(data).pipe(
  takeUntil(fromEvent(cancelBtn, 'click'))
))

And now we can consider our application complete. A little refactoring and get the final version:

import { fromEvent, Observable } from "rxjs";
import { map, switchMap, filter, takeUntil, exhaustMap } from "rxjs/operators";

const input = document.querySelector('#file');
const uploadBtn = document.querySelector('#upload');
const progressBar = document.querySelector('#progress-bar');
const cancelBtn = document.querySelector('#cancel');

const fromUploadBtn = fromEvent(uploadBtn, 'click');
const fromCancelBtn = fromEvent(cancelBtn, 'click');

fromEvent(input, 'change').pipe(
    switchMap(() => fromUploadBtn),
    map(() => input.files[0]),
    filter(file => !!file),
    map(file => createFormData(file)),
    exhaustMap(data => upload(data).pipe(
        takeUntil(fromCancelBtn)
    ))
).subscribe({
    next: width => setProgressBarWidth(width)
});

function setProgressBarWidth(width) {
    progressBar.style.width = `${width}%`;
}

function createFormData(file) {
    const form = new FormData();
    form.append('file', file);
    return form;
}

function upload(data) {
    return new Observable(observer => {
        const xhr = new XMLHttpRequest();

        xhr.upload.onprogress = e => {
            const progress = e.loaded / e.total * 100;
            observer.next(progress);
        };

        xhr.onerror = e => observer.error(e);
        xhr.onload = () => observer.complete();

        xhr.open('POST', '/upload', true);
        xhr.send(data);

        return () => xhr.abort();
    });
}

I posted my version here .

Angular and HttpClient


If you work with Angular, then you do not need to use xhr directly. Angular has an HttpClient service. This service can track the progress of loading / unloading, for this it is enough to pass the following parameters to the post method:

  • reportProgress: true - get upload / download information
  • observe: "events" - indicate that we want to receive HttpEvents from the stream

Here's what the upload method in Angular will look like:

export class UploaderService {
  constructor(private http: HttpClient) { }

  public upload(data: FormData): Observable<number> {
    return this.http.post('/upload', data, { reportProgress: true, observe: 'events' })
      .pipe(
        filter(event => event.type === HttpEventType.UploadProgress),
        map(event => event as HttpProgressEvent),
        map(event => event.loaded / event.total * 100)
      );
  }
}

The filter statement filters out only upload events. Other events do not interest us. Further we bring the event to the HttpProgressEvent in order to access the loaded and total properties. We consider the percentage.

HttpClient is just a wrapper over xhr that saves us from a boilerplate and makes working with HTTP easier.

An example application on Angular can be found here .

Conclusion


RxJS is a very powerful tool in the hands of the developer. In his arsenal there is a huge set of operators for all occasions. Unfortunately, because of this, the threshold for entering this technology is quite high. And often, people unknowingly start writing their "bikes", which makes the code difficult to maintain.

Therefore, I would like to wish all readers not to stand still and not be afraid to experiment. Learn RxJS. Suddenly, you come across an operator that can turn 10 lines of code into one. Or it helps make the code a little clearer.

Good luck

Source: https://habr.com/ru/post/undefined/


All Articles