frontend/.pnpm-store/v3/files/e2/e5302ce2985a6f97cd897e20a48c249322ba6ca000c63fed5ff8dee08a8927c055004d94c2d3a6bf69b69826e8a8d7c0268feecf4aa9c476f3b25cecf5a854

151 lines
3.5 KiB
Plaintext

module.exports = workerMgr;
var eom = "\x03";
var eom1 = "\x0e";
var eom2 = "\x0f";
var CSVError = require('./CSVError');
function workerMgr() {
var exports = {
initWorker: initWorker,
sendWorker: sendWorker,
setParams: setParams,
drain: function(){},
isRunning: isRunning,
destroyWorker: destroyWorker
};
var workers = [];
var running = 0;
var waiting = null;
function initWorker(num, params) {
workers = [];
running = 0;
waiting = null;
for (var i = 0; i < num; i++) {
workers.push(new Worker(params));
}
}
function isRunning() {
return running > 0;
}
function destroyWorker() {
workers.forEach(function(w) {
w.destroy();
});
}
function sendWorker(data, startIdx, transformCb, cbResult) {
if (workers.length > 0) {
var worker = workers.shift();
running++;
worker.parse(data, startIdx, function(result) {
// var arr=JSON.parse(result);
// arr.forEach(function(item){
// console.log('idx',item.index)
// })
workers.push(worker);
cbResult(result, startIdx);
running--;
if (waiting === null && running === 0) {
exports.drain();
} else if (waiting) {
sendWorker.apply(this, waiting);
waiting = null;
}
});
process.nextTick(transformCb);
} else {
waiting = [data, startIdx, transformCb, cbResult];
}
}
function setParams(params) {
workers.forEach(function(w) {
w.setParams(params);
});
}
return exports;
}
function Worker(params) {
var spawn = require("child_process").spawn;
this.cp = spawn(process.execPath, [__dirname + "/worker.js"], {
env: {
child:true
},
stdio:['pipe', 'pipe', 2, 'ipc']
// stdio:[0,1,2,'ipc']
});
this.setParams(params);
this.cp.on("message", this.onChildMsg.bind(this));
this.buffer = "";
var self = this;
this.cp.stdout.on("data", function(d) {
var str = d.toString("utf8");
var all = self.buffer + str;
var cmdArr = all.split(eom);
while (cmdArr.length > 1) {
self.onChildMsg(cmdArr.shift());
}
self.buffer = cmdArr[0];
});
}
Worker.prototype.setParams = function(params) {
var msg = "0" + JSON.stringify(params);
this.sendMsg(msg);
};
/**
* msg is like:
* <cmd><data>
* cmd is from 0-9
*/
Worker.prototype.onChildMsg = function(msg) {
if (msg) {
var cmd = msg[0];
var data = msg.substr(1);
switch (cmd) {
case "0": //total line number of current chunk
if (this.cbLine) {
var sp = data.split("|");
var len = parseInt(sp[0]);
var partial = sp[1];
this.cbLine(len, partial);
}
break;
case "1": // json array of current chunk
if (this.cbResult) {
var rows = data.split(eom1);
rows.pop();
var res = [];
rows.forEach(function(row) {
var sp = row.split(eom2);
res.push({
index: sp[0],
row: sp[1],
err: sp[2] ? CSVError.fromArray(JSON.parse(sp[2])) : null,
json: sp[3]
});
});
this.cbResult(res);
}
break;
}
}
};
Worker.prototype.parse = function(data, startIdx, cbResult) {
this.cbResult = cbResult;
var msg = "1" + startIdx + "|" + data;
this.sendMsg(msg);
};
Worker.prototype.destroy = function() {
this.cp.kill();
};
Worker.prototype.sendMsg = function(msg) {
this.cp.stdin.write(msg + eom, "utf8");
// this.cp.send(msg)
};