自从我上一篇关于RxJS基础的文章以来已经有很长时间了。在评论中,我被要求显示可能在实践中有用的更复杂的示例。因此,我决定稍微简化一下理论,今天我们将讨论上传文件。我们做什么?- 我们将编写一个小页面,用户可以在其中选择文件以将其上传到服务器
- 添加进度条,以便显示文件上传进度。
- 通过单击取消按钮来添加取消下载的功能
要理解本文,您需要基本的RxJS知识。什么是Observable,运算符以及HOO运算符
我们不会将猫拉到尾巴,立即开始做生意!训练
首先,我们需要一个可以接受文件下载请求的服务器。任何服务器都适用于此,本文将结合使用node.js和express和multer:const express = require("express");
const multer = require("multer");
const app = express();
const upload = multer({ dest:"files" });
app.post("/upload", upload.single("file"), function (req, res) {
const { file } = req;
if (file) {
res.send("File uploaded successfully");
} else {
res.error("Error");
}
});
app.listen(3000);
现在创建一个html页面,我们将在其中放置所有必要的元素:<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport"
content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>File uploading</title>
</head>
<body>
<label for="file">load file: <input id="file" type="file"></label>
<button id="upload">Upload</button>
<button id="cancel">Cancel</button>
<div id="progress-bar" style="width: 0; height: 2rem; background-color: aquamarine;"></div>
<script src="index.js" type="text/javascript"></script>
</body>
</html>
现在,在页面上,我们有四个与用户进行交互的元素:- 输入文件类型,以便用户可以选择要上传的文件
- 上载按钮,单击后我们将开始上载
- 取消按钮,将取消下载
- 进度条的起始宽度为0。在上传过程中,我们将更改其宽度
在body标签的最后,我添加了一个指向index.js脚本的链接,我们还需要创建该链接:
const input = document.querySelector('#file');
const uploadBtn = document.querySelector('#upload');
const progressBar = document.querySelector('#progress-bar');
const cancelBtn = document.querySelector('#cancel');
它看起来应该像这样:
一切都是流动
要告诉浏览器选择哪个文件,用户必须单击“选择文件”按钮。之后,将打开操作系统的对话框,其中将显示文件夹树。选择文件后,浏览器将下载有关该文件的所有必要信息。我们如何理解用户已选择文件?为此有一个“更改”事件。触发事件后,我们可以转到输入中的files数组,将在其中写入文件数据。我们如何收听变更事件?您可以使用addEventListener方法并使用它。但是我们使用RxJS,其中任何事件都可以表示为流:fromEvent(input, 'change').pipe(
map(() => input.files[0])
).subscribe({
next: data => console.log(file)
});
添加上载功能,该功能会将文件上载到服务器。现在,让她的身体空着:function upload(file) {
console.log(file);
}
单击uploadBtn按钮后应调用上载函数:fromEvent(uploadBtn, 'click').subscribe({
next: () => upload(input.files[0])
});
合并流
现在我们的代码与使用addEventListener编写的代码没有什么不同。是的,它可以工作,但是如果我们这样保留它,那么我们将失去RxJS提供给我们的优势。我们能做什么?我们将描述上传文件的步骤顺序:- 文件选择
- 点击uploadBtn
- 从input.files中提取文件
- 上传文件
现在,我们将此序列转移到代码中。但是如何结合输入流和uploadBtn流?switchMap运算符将为我们提供帮助,这使我们可以将一个线程投射到另一个线程上:fromEvent(input, 'change').pipe(
switchMap(() => fromEvent(uploadBtn, 'click')),
map(() => input.files[0])
).subscribe({
next: file => upload(file)
});
该代码与我们上面描述的指令序列非常相似。用户选择一个文件,switchMap被触发,我们订阅了uploadBtn。但是随后什么也不会发生。switchMap只允许将fromEvent(uploadBtn,'click')生成的值传递给外部流。要开始上传文件,您需要执行第二条指令,即单击uploadBtn。然后,它将计算map方法,该方法将从数组中提取文件,并且subscribe中已经调用了upload方法。这里最有趣的是,指令序列没有中断。为了使上传功能正常工作,必须先触发“更改”事件。但是,仍然存在一个问题。用户可以选择一个文件,然后取消选择它。然后,当我们尝试上传文件时,我们传入了上传功能-未定义。为了避免这种情况,我们应该添加检查:fromEvent(input, 'change').pipe(
switchMap(() => fromEvent(uploadBtn, 'click')),
map(() => input.files[0]),
filter(file => !!file)
).subscribe({
next: file => upload(file)
});
使用xhr
是时候实施最困难的工作了-卸载过程。我将在使用xhr的示例中展示它,因为在撰写本文时,提取并不知道如何跟踪上传进度。您可以使用任何其他库(例如axios或jQuery.ajax)来实现卸载。
由于我在服务器端使用multer,因此我将不得不在表单内传输文件(multer仅接受此格式的数据)。为此,我编写了createFormData函数:function createFormData(file) {
const form = new FormData();
form.append('file', file);
return form;
}
fromEvent(input, 'change').pipe(
switchMap(() => fromEvent(uploadBtn, 'click')),
map(() => input.files[0]),
filter(file => !!file),
map(file => createFormData(file))
).subscribe({
next: data => upload(data)
});
我们将通过XMLHttpRequest卸载表单。我们需要创建该对象的实例,并在其上定义unload和onerror方法。上传完成后,第一个将触发,第二个错误发生时,将触发第二个。function upload(data) {
const xhr = new XMLHttpRequest();
xhr.onload = () => console.log('success');
xhr.onerror = e => console.error(e);
xhr.open('POST', '/upload', true);
xhr.send(data);
}
现在我们有一个可行的例子。但是它有两个缺点:- 无法取消下载
- 如果您单击n次uploadBtn按钮,那么我们将有n个并行连接来上传一个文件
这是因为上载功能无法使用。她独自生活。我们需要修理它。让我们让函数将Observable返回给我们。然后,我们可以控制文件的上传:function upload(data) {
return new Observable(observer => {
const xhr = new XMLHttpRequest();
xhr.onload = () => {
observer.next();
observer.complete();
};
xhr.onerror = e => observer.error(e);
xhr.open('POST', '/upload', true);
xhr.send(data);
return () => xhr.abort();
});
}
注意Observable内部返回的箭头函数。取消订阅并取消上传时将调用此方法。将上载呼叫放在switchMap中:fromEvent(input, 'change').pipe(
switchMap(() => fromEvent(uploadBtn, 'click')),
map(() => input.files[0]),
filter(file => !!file),
map(file => createFormData(file)),
switchMap(data => upload(data))
).subscribe({
next: () => console.log('File uploaded')
});
现在,如果用户再次单击上载按钮,则先前的请求将被取消,但是将创建一个新请求。取消点击请求
我们仍然有calcelBtn按钮。我们必须取消请求。takeUntil运算符将在此提供帮助。takeUntil转换为“ take bye”。该运算符从外部流获取值并将其提供给更远的链。只要内部线程存在并且什么都不产生。一旦内部线程生成一个值,takeUntil将调用unsubscribe方法并从外部线程取消订阅。在添加运算符之前,我们需要确定要取消订阅的流。我们对上传感兴趣,因为仅需要完成文件的上传,即 退订内部线程:fromEvent(input, 'change').pipe(
switchMap(() => fromEvent(uploadBtn, 'click')),
map(() => input.files[0]),
filter(file => !!file),
map(file => createFormData(file)),
switchMap(data => upload(data).pipe(
takeUntil(fromEvent(cancelBtn, 'click'))
))
).subscribe({
next: () => console.log('File uploaded')
});
进度条
仍然要添加进度条。要跟踪进度,我们需要定义xhr.upload.onprogress方法。当ProgressEvent事件发生时,将调用此方法。事件对象包含一些对我们有用的属性:- lengthComputable-如果为true,则我们知道完整的文件大小(在我们的情况下,始终为true)
- 总计-字节总数
- 已加载-已发送的字节数
更改上传功能:function upload(data) {
return new Observable(observer => {
const xhr = new XMLHttpRequest();
xhr.upload.onprogress = e => {
const progress = e.loaded / e.total * 100;
observer.next(progress);
};
xhr.onerror = e => observer.error(e);
xhr.onload = () => observer.complete();
xhr.open('POST', '/upload', true);
xhr.send(data);
return () => xhr.abort();
});
}
现在,upload将上传状态吐出到流中。只需编写一个将更改progressBar元素的样式属性的函数即可:function setProgressBarWidth(width) {
progressBar.style.width = `${width}%`;
}
fromEvent(input, 'change').pipe(
).subscribe({
next: width => setProgressBarWidth(width)
});

快速提示:为了避免您的文件在本地这么快地上传,请打开Chrome devtools“性能”标签中的“快速3G”或“慢速3G”设置。
回忆起
我们有一个完整的工作申请表。它仍然需要增加一些笔触。现在,当您单击uploadBtn按钮时,我们将取消先前的上传并开始新的上传。但是我们已经有一个取消按钮。我希望uploadBtn按钮在我们上传文件(或取消我们的上传)之前不响应后续点击。该怎么办?您可以挂起disable属性,直到上传过程完成。但是还有另一种选择-exhaustMap运算符。该语句将忽略来自外部线程的新值,直到内部线程完成为止。将switchMap替换为exhaustMap:exhaustMap(data => upload(data).pipe(
takeUntil(fromEvent(cancelBtn, 'click'))
))
现在我们可以认为我们的应用程序已经完成。进行一些重构并获得最终版本:import { fromEvent, Observable } from "rxjs";
import { map, switchMap, filter, takeUntil, exhaustMap } from "rxjs/operators";
const input = document.querySelector('#file');
const uploadBtn = document.querySelector('#upload');
const progressBar = document.querySelector('#progress-bar');
const cancelBtn = document.querySelector('#cancel');
const fromUploadBtn = fromEvent(uploadBtn, 'click');
const fromCancelBtn = fromEvent(cancelBtn, 'click');
fromEvent(input, 'change').pipe(
switchMap(() => fromUploadBtn),
map(() => input.files[0]),
filter(file => !!file),
map(file => createFormData(file)),
exhaustMap(data => upload(data).pipe(
takeUntil(fromCancelBtn)
))
).subscribe({
next: width => setProgressBarWidth(width)
});
function setProgressBarWidth(width) {
progressBar.style.width = `${width}%`;
}
function createFormData(file) {
const form = new FormData();
form.append('file', file);
return form;
}
function upload(data) {
return new Observable(observer => {
const xhr = new XMLHttpRequest();
xhr.upload.onprogress = e => {
const progress = e.loaded / e.total * 100;
observer.next(progress);
};
xhr.onerror = e => observer.error(e);
xhr.onload = () => observer.complete();
xhr.open('POST', '/upload', true);
xhr.send(data);
return () => xhr.abort();
});
}
我在这里发布了我的版本。Angular和HttpClient
如果使用Angular,则不需要直接使用xhr。Angular具有HttpClient服务。该服务可以跟踪加载/卸载的进度,为此,将以下参数传递给post方法就足够了:- reportProgress:true-获取上载/下载信息
- 观察:“事件”-表示我们要从流中接收HttpEvent
Angular中的上载方法如下所示:export class UploaderService {
constructor(private http: HttpClient) { }
public upload(data: FormData): Observable<number> {
return this.http.post('/upload', data, { reportProgress: true, observe: 'events' })
.pipe(
filter(event => event.type === HttpEventType.UploadProgress),
map(event => event as HttpProgressEvent),
map(event => event.loaded / event.total * 100)
);
}
}
filter语句仅过滤出上载事件。其他事件使我们不感兴趣。此外,我们将事件带到HttpProgressEvent以便访问已加载的属性和total属性。我们考虑百分比。HttpClient只是xhr的包装,它使我们摆脱了样板,并简化了HTTP的使用。角上的一个示例应用程序,可以发现在这里。结论
RxJS在开发人员手中是一个非常强大的工具。在他的军械库中,各种场合都有大量的操作员。不幸的是,因此,进入这项技术的门槛很高。通常,人们在不知不觉中就开始编写“自行车”,这使得代码难以维护。因此,我希望所有读者不要停滞不前,不要害怕尝试。了解RxJS。突然,您遇到了一个运算符,该运算符可以将10行代码转换为1行。或有助于使代码更清晰。祝好运