Commit c77160c5 authored by wang's avatar wang

并发

parent 175ada9b
import {Worker} from 'worker_threads';
//主线程
export class ThreadPool {
size = 5;
queue = [];
workerGroup = [];
free = 0;
maxFree = 2;
monitor = null;
constructor(size) {
this.size = size;
}
//初始化子线程
init() {
for (let i = 0; i < this.size; i++) {
this.workerGroup.push({
id: i,
status: false,
worker: new Worker('./run.js')
});
this.workerGroup[i].worker.on("message", (message) => {
if (message === 'end') {
this.workerGroup[i].status = false;
this.check();
}
});
}
this.monitor = setInterval(() => {
if (this.isFree()) {
this.free++;
console.log(`空闲次数: ${this.free},如果超过${this.maxFree}次,线程池将关闭,后续提交任务将自动开启`)
} else {
this.free = 0;
}
this.check();
if (this.free > this.maxFree) {
this.shutdown();
clearInterval(this.monitor);
this.monitor = null;
this.workerGroup = [];
this.free = 0;
}
}, 10000)
}
isFree() {
for (let i = 0; i < this.workerGroup.length; i++) {
if (this.workerGroup[i].status) {
return false;
}
}
if (this.queue.length > 0) {
return false;
}
return true;
}
//清理所有子线程
shutdown() {
this.workerGroup.forEach(e => {
e.worker.terminate();
});
}
/**
* 提交异步任务
* @param {*} taskContext 任务函数
* @param {*} data 任务所需参数
*/
submitAsync(taskContext, data) {
this.add(true, taskContext, data)
}
/**
* 提交同步任务
* @param {*} taskContext 任务函数
* @param {*} data 任务所需参数
*/
submit(taskContext, data) {
this.add(false, taskContext, data)
}
//添加任务排队
add(isAsync, taskContext, data) {
if (this.workerGroup.length < 1) this.init();//懒加载
this.queue.push({
isAsync: isAsync,
data: data,
taskContext: taskContext.toString()
});
this.check();
}
//检查任务
check() {
if (this.queue.length > 0) {
for (let i = 0; i < this.workerGroup.length; i++) {
if (!this.workerGroup[i].status) {
this.workerGroup[i].status = true;
this.workerGroup[i].worker.postMessage(this.queue.pop());
break;
}
}
}
}
}
import {ThreadPool} from "./ThreadPool.js";
const pool = new ThreadPool(2);
let data = { name: '你好' }
// 异步任务
pool.submitAsync(end => {
setTimeout(() => {
console.log("任务一开始" + new Date().getTime());
let num = 0;
while (true) {
num++;
if (num > 100000000) {
console.log("任务一结束" + new Date().getTime());
break;
}
}
end();
}, 1000);
});
//异步任务+入参
pool.submitAsync((end, data1) => {
setTimeout(() => {
let num = 0;
console.log("任务二开始" + new Date().getTime() + ",data=" + JSON.stringify(data1));
while (true) {
num++;
if (num > 100000000) {
console.log("任务二结束" + new Date().getTime());
break;
}
}
end();
}, 1000);
}, data);
//同步任务
pool.submit(data1 => {
console.log("任务一开始" + new Date().getTime() + ",data" + JSON.stringify(data1));
let num = 0;
while (true) {
num++;
if (num > 100000000) {
console.log("任务一结束" + new Date().getTime());
break;
}
}
}, data);
//同步任务+入参
pool.submit(data1 => {
console.log("任务二开始" + new Date().getTime() + ",data" + JSON.stringify(data1));
let num = 0;
while (true) {
num++;
if (num > 100000000) {
console.log("任务二结束" + new Date().getTime());
break;
}
}
}, data);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment