使用RxJS交互式将文件上传到服务器



自从我上一篇关于RxJS基础的文章以来已经有很长时间了。在评论中,我被要求显示可能在实践中有用的更复杂的示例。因此,我决定稍微简化一下理论,今天我们将讨论上传文件。

我们做什么?

  • 我们将编写一个小页面,用户可以在其中选择文件以将其上传到服务器
  • 添加进度条,以便显示文件上传进度。
  • 通过单击取消按钮来添加取消下载的功能

要理解本文,您需要基本的RxJS知识。什么是Observable运算符以及HOO运算符

我们不会将猫拉到尾巴,立即开始做生意!

训练


首先,我们需要一个可以接受文件下载请求的服务器。任何服务器都适用于此,本文将结合使用node.js和express和multer

const express = require("express");
const multer  = require("multer");

const app = express();
const upload = multer({ dest:"files" });

app.post("/upload", upload.single("file"), function (req, res) {
    const { file } = req;

    if (file) {
        res.send("File uploaded successfully");
    } else {
        res.error("Error");
    }

});

app.listen(3000);

现在创建一个html页面,我们将在其中放置所有必要的元素:

<!doctype html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport"
          content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>File uploading</title>
</head>
<body>
    <label for="file">load file: <input id="file" type="file"></label>
    <button id="upload">Upload</button>
    <button id="cancel">Cancel</button>
    <div id="progress-bar" style="width: 0; height: 2rem; background-color: aquamarine;"></div>
    <script src="index.js" type="text/javascript"></script>
</body>
</html>

现在,在页面上,我们有四个与用户进行交互的元素:

  • 输入文件类型,以便用户可以选择要上传的文件
  • 上载按钮,单击后我们将开始上载
  • 取消按钮,将取消下载
  • 进度条的起始宽度为0。在上传过程中,我们将更改其宽度

在body标签的最后,我添加了一个指向index.js脚本的链接,我们还需要创建该链接:

//   ,     
const input = document.querySelector('#file');
const uploadBtn = document.querySelector('#upload');
const progressBar = document.querySelector('#progress-bar');
const cancelBtn = document.querySelector('#cancel');

它看起来应该像这样:



一切都是流动


要告诉浏览器选择哪个文件,用户必须单击“选择文件”按钮。之后,将打开操作系统的对话框,其中将显示文件夹树。选择文件后,浏览器将下载有关该文件的所有必要信息。

我们如何理解用户已选择文件?为此有一个“更改”事件。触发事件后,我们可以转到输入中的files数组,将在其中写入文件数据。

我们如何收听变更事件?您可以使用addEventListener方法并使用它。但是我们使用RxJS,其中任何事件都可以表示为流:

fromEvent(input, 'change').pipe(
    //    
    map(() => input.files[0])
).subscribe({
    next: data => console.log(file)
});

添加上载功能,该功能会将文件上载到服务器。现在,让她的身体空着:

function upload(file) {
  console.log(file);
}

单击uploadBtn按钮后应调用上载函数:

fromEvent(uploadBtn, 'click').subscribe({
  next: () => upload(input.files[0])
});

合并流


现在我们的代码与使用addEventListener编写的代码没有什么不同。是的,它可以工作,但是如果我们这样保留它,那么我们将失去RxJS提供给我们的优势。

我们能做什么?我们将描述上传文件的步骤顺序:

  • 文件选择
  • 点击uploadBtn
  • 从input.files中提取文件
  • 上传文件

现在,我们将此序列转移到代码中。但是如何结合输入流和uploadBtn流?switchMap运算符将为我们提供帮助,这使我们可以将一个线程投射到另一个线程上:

fromEvent(input, 'change').pipe(
    switchMap(() => fromEvent(uploadBtn, 'click')),
    map(() => input.files[0])
).subscribe({
    next: file => upload(file)
});

该代码与我们上面描述的指令序列非常相似。用户选择一个文件,switchMap被触发,我们订阅了uploadBtn。但是随后什么也不会发生。

switchMap只允许将fromEvent(uploadBtn,'click')生成的值传递给外部流。要开始上传文件,您需要执行第二条指令,即单击uploadBtn。然后,它将计算map方法,该方法将从数组中提取文件,并且subscribe中已经调用了upload方法。

这里最有趣的是,指令序列没有中断。为了使上传功能正常工作,必须先触发“更改”事件。

但是,仍然存在一个问题。用户可以选择一个文件,然后取消选择它。然后,当我们尝试上传文件时,我们传入了上传功能-未定义。为了避免这种情况,我们应该添加检查:

fromEvent(input, 'change').pipe(
    switchMap(() => fromEvent(uploadBtn, 'click')),
    map(() => input.files[0]),
    filter(file => !!file)
).subscribe({
    next: file => upload(file)
});

使用xhr


是时候实施最困难的工作了-卸载过程。我将在使用xhr的示例中展示它,因为在撰写本文时,提取并不知道如何跟踪上传进度
您可以使用任何其他库(例如axios或jQuery.ajax)来实现卸载。


由于我在服务器端使用multer,因此我将不得不在表单内传输文件(multer仅接受此格式的数据)。为此,我编写了createFormData函数:

function createFormData(file) {
    const form = new FormData();
    //       file
    form.append('file', file);
    return form;
}

fromEvent(input, 'change').pipe(
    switchMap(() => fromEvent(uploadBtn, 'click')),
    map(() => input.files[0]),
    filter(file => !!file),
    map(file => createFormData(file))
).subscribe({
    next: data => upload(data)
});

我们将通过XMLHttpRequest卸载表单。我们需要创建该对象的实例,并在其上定义unload和onerror方法。上传完成后,第一个将触发,第二个错误发生时,将触发第二个。

function upload(data) {
    const xhr = new XMLHttpRequest();

    //        
    xhr.onload = () => console.log('success');

    //    
    xhr.onerror = e => console.error(e);

    //  
    xhr.open('POST', '/upload', true);

    //  
    xhr.send(data);
}

现在我们有一个可行的例子。但是它有两个缺点:

  • 无法取消下载
  • 如果您单击n次uploadBtn按钮,那么我们将有n个并行连接来上传一个文件

这是因为上载功能无法使用。她独自生活。我们需要修理它。让我们让函数将Observable返回给我们。然后,我们可以控制文件的上传:

function upload(data) {
    return new Observable(observer => {
        const xhr = new XMLHttpRequest();

        //    ,      
        //   
        xhr.onload = () => {
            observer.next();
            observer.complete();
        };
        
        xhr.onerror = e => observer.error(e);
        
        xhr.open('POST', '/upload', true);
        xhr.send(data);

        //   -  
        return () => xhr.abort();
    });
}

注意Observable内部返回的箭头函数。取消订阅并取消上传时将调用此方法。

将上载呼叫放在switchMap中:

fromEvent(input, 'change').pipe(
    switchMap(() => fromEvent(uploadBtn, 'click')),
    map(() => input.files[0]),
    filter(file => !!file),
    map(file => createFormData(file)),
    switchMap(data => upload(data))
).subscribe({
    next: () => console.log('File uploaded')
});

现在,如果用户再次单击上载按钮,则先前的请求将被取消,但是将创建一个新请求。

取消点击请求


我们仍然有calcelBtn按钮。我们必须取消请求。takeUntil运算符将在此提供帮助。

takeUntil转换为“ take bye”。该运算符从外部流获取值并将其提供给更远的链。只要内部线程存在并且什么都不产生。一旦内部线程生成一个值,takeUntil将调用unsubscribe方法并从外部线程取消订阅。

在添加运算符之前,我们需要确定要取消订阅的流。我们对上传感兴趣,因为仅需要完成文件的上传,即 退订内部线程:

fromEvent(input, 'change').pipe(
    switchMap(() => fromEvent(uploadBtn, 'click')),
    map(() => input.files[0]),
    filter(file => !!file),
    map(file => createFormData(file)),
    switchMap(data => upload(data).pipe(
        //    upload
        takeUntil(fromEvent(cancelBtn, 'click'))
    ))
).subscribe({
    next: () => console.log('File uploaded')
});

进度条


仍然要添加进度条。要跟踪进度,我们需要定义xhr.upload.onprogress方法。当ProgressEvent事件发生时,将调用此方法。事件对象包含一些对我们有用的属性:

  • lengthComputable-如果为true,则我们知道完整的文件大小(在我们的情况下,始终为true)
  • 总计-字节总数
  • 已加载-已发送的字节数

更改上传功能:

function upload(data) {
    return new Observable(observer => {
        const xhr = new XMLHttpRequest();

        xhr.upload.onprogress = e => {
            //  
            const progress = e.loaded / e.total * 100;
            observer.next(progress);
        };

        xhr.onerror = e => observer.error(e);
        xhr.onload = () => observer.complete();

        xhr.open('POST', '/upload', true);
        xhr.send(data);

        return () => xhr.abort();
    });
}

现在,upload将上传状态吐出到流中。只需编写一个将更改progressBar元素的样式属性的函数即可:

function setProgressBarWidth(width) {
    progressBar.style.width = `${width}%`;
}

fromEvent(input, 'change').pipe(
   /* ..
   
   */
).subscribe({
    next: width => setProgressBarWidth(width)
});


快速提示:为了避免您的文件在本地这么快地上传,请打开Chrome devtools“性能”标签中的“快速3G”或“慢速3G”设置。

回忆起


我们有一个完整的工作申请表。它仍然需要增加一些笔触。现在,当您单击uploadBtn按钮时,我们将取消先前的上传并开始新的上传。但是我们已经有一个取消按钮。

我希望uploadBtn按钮在我们上传文件(或取消我们的上传)之前不响应后续点击。该怎么办?

您可以挂起disable属性,直到上传过程完成。但是还有另一种选择-exhaustMap运算符。该语句将忽略来自外部线程的新值,直到内部线程完成为止。将switchMap替换为exhaustMap:

exhaustMap(data => upload(data).pipe(
  takeUntil(fromEvent(cancelBtn, 'click'))
))

现在我们可以认为我们的应用程序已经完成。进行一些重构并获得最终版本:

import { fromEvent, Observable } from "rxjs";
import { map, switchMap, filter, takeUntil, exhaustMap } from "rxjs/operators";

const input = document.querySelector('#file');
const uploadBtn = document.querySelector('#upload');
const progressBar = document.querySelector('#progress-bar');
const cancelBtn = document.querySelector('#cancel');

const fromUploadBtn = fromEvent(uploadBtn, 'click');
const fromCancelBtn = fromEvent(cancelBtn, 'click');

fromEvent(input, 'change').pipe(
    switchMap(() => fromUploadBtn),
    map(() => input.files[0]),
    filter(file => !!file),
    map(file => createFormData(file)),
    exhaustMap(data => upload(data).pipe(
        takeUntil(fromCancelBtn)
    ))
).subscribe({
    next: width => setProgressBarWidth(width)
});

function setProgressBarWidth(width) {
    progressBar.style.width = `${width}%`;
}

function createFormData(file) {
    const form = new FormData();
    form.append('file', file);
    return form;
}

function upload(data) {
    return new Observable(observer => {
        const xhr = new XMLHttpRequest();

        xhr.upload.onprogress = e => {
            const progress = e.loaded / e.total * 100;
            observer.next(progress);
        };

        xhr.onerror = e => observer.error(e);
        xhr.onload = () => observer.complete();

        xhr.open('POST', '/upload', true);
        xhr.send(data);

        return () => xhr.abort();
    });
}

我在这里发布了我的版本

Angular和HttpClient


如果使用Angular,则不需要直接使用xhr。Angular具有HttpClient服务。该服务可以跟踪加载/卸载的进度,为此,将以下参数传递给post方法就足够了:

  • reportProgress:true-获取上载/下载信息
  • 观察:“事件”-表示我们要从流中接收HttpEvent

Angular中的上载方法如下所示:

export class UploaderService {
  constructor(private http: HttpClient) { }

  public upload(data: FormData): Observable<number> {
    return this.http.post('/upload', data, { reportProgress: true, observe: 'events' })
      .pipe(
        filter(event => event.type === HttpEventType.UploadProgress),
        map(event => event as HttpProgressEvent),
        map(event => event.loaded / event.total * 100)
      );
  }
}

filter语句仅过滤出上载事件。其他事件使我们不感兴趣。此外,我们将事件带到HttpProgressEvent以便访问已加载的属性和total属性。我们考虑百分比。

HttpClient只是xhr的包装,它使我们摆脱了样板,并简化了HTTP的使用。

角上的一个示例应用程序,可以发现在这里

结论


RxJS在开发人员手中是一个非常强大的工具。在他的军械库中,各种场合都有大量的操作员。不幸的是,因此,进入这项技术的门槛很高。通常,人们在不知不觉中就开始编写“自行车”,这使得代码难以维护。

因此,我希望所有读者不要停滞不前,不要害怕尝试。了解RxJS。突然,您遇到了一个运算符,该运算符可以将10行代码转换为1行。或有助于使代码更清晰。

祝好运

Source: https://habr.com/ru/post/undefined/


All Articles