Content Table

任务队列、异步并发

使用任务队列异步并发且限制并发数量的主要控制流程,这个模型在大文件分片上传时能够用到。

框架代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// 异步、并发、最大并发数执行任务。
const MAX_EXECUTING_TASK_COUNT = 3; // 任务执行时的最大并发数
let taskQueue = []; // 任务队列

let totalTaskCount = 0; // 总的任务数量
let finishedTaskCount = 0; // 已完成的任务数量
let executingTaskCount = 0; // 正在执行的任务数量

start();

// 开始执行。
function start() {
buildTaskQueue();
totalTaskCount = taskQueue.length;

// 开启多个任务并发执行。
for (let i = 0; i < MAX_EXECUTING_TASK_COUNT; i++) {
startTask();
}
}

// 构建任务队列,例如从服务器获取任务信息。
function buildTaskQueue() {
taskQueue = [1, 2, 3, 4, 5, 6, 7, 8, 9];
}

/**
* 开始任务。
*
* @returns 无。
*/
function startTask() {
/*
逻辑:
1. 如果超过允许的最大并发任务数则不开启新的任务。
2. 如果任务队列为空则不开启新的任务。
3. 从任务队列里获取一个任务然后执行:
3.1 正在执行的任务数 +1。
3.2 使用 Promise 执行异步耗时任务。
4. 每个任务执行结束后调用 onTaskFinish(),在其中决定执行新的任务还是任务所有任务都执行结束。
*/

// [1] 如果超过允许的最大并发任务数则不开启新的任务。
if (executingTaskCount >= MAX_EXECUTING_TASK_COUNT) {
return;
}
// [2] 如果任务队列为空则不开启新的任务。
if (taskQueue.length == 0) {
return;
}

// [3] 从任务队列里获取一个任务然后执行:
// [3.1] 正在执行的任务数 +1。
executingTaskCount++;
let t = taskQueue.shift();
let delay = parseInt(200 + Math.random() * 2000);

console.log(`[开始] 执行任务: ${t}, 耗时: ${delay} 毫秒`);

// [3.2] 使用 Promise 执行异步耗时任务。
let p = new Promise((resolve, reject) => {
// 传输文件 (异步耗时任务)
setTimeout(()=>{
resolve(t);
}, delay);
});

// [4] 每个任务执行结束后调用 onTaskFinish(),在其中决定执行新的任务还是任务所有任务都执行结束。
p.then((r) => {
onTaskFinish(t);
}).catch(err => {
onTaskFinish(t);
console.err(err);
})
}

/**
* 任务结束 (成功或失败)。
*
* @param {Json} task 任务。
* @returns 无。
*/
function onTaskFinish(task) {
console.log(`[完成] 执行任务: ${task}, 剩下任务数: ${taskQueue.length}`);
finishedTaskCount++;
executingTaskCount--;

if (finishedTaskCount == totalTaskCount) {
// 任务都执行结束。
console.log("所有任务执行完成 ✔️✔️✔️");
} else {
// 一个任务结束,执行新的任务。
startTask();
}
}

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[开始] 执行任务: 1, 耗时: 1605 毫秒
[开始] 执行任务: 2, 耗时: 266 毫秒
[开始] 执行任务: 3, 耗时: 1791 毫秒
[完成] 执行任务: 2, 剩下任务数: 6
[开始] 执行任务: 4, 耗时: 267 毫秒
[完成] 执行任务: 4, 剩下任务数: 5
[开始] 执行任务: 5, 耗时: 712 毫秒
[完成] 执行任务: 5, 剩下任务数: 4
[开始] 执行任务: 6, 耗时: 1698 毫秒
[完成] 执行任务: 1, 剩下任务数: 3
[开始] 执行任务: 7, 耗时: 855 毫秒
[完成] 执行任务: 3, 剩下任务数: 2
[开始] 执行任务: 8, 耗时: 994 毫秒
[完成] 执行任务: 7, 剩下任务数: 1
[开始] 执行任务: 9, 耗时: 1569 毫秒
[完成] 执行任务: 8, 剩下任务数: 0
[完成] 执行任务: 6, 剩下任务数: 0
[完成] 执行任务: 9, 剩下任务数: 0
所有任务执行完成 ✔️✔️✔️