var util = require("util"); var Transform = require("stream").Transform; var os = require("os"); var stripBom = require('strip-bom'); var eol = os.EOL; // var Processor = require("./Processor.js"); var defParam = require("./defParam"); var fileline = require("./fileline"); var fileLineToCSVLine = require("./fileLineToCSVLine"); var linesToJson = require("./linesToJson"); var CSVError = require("./CSVError"); var workerMgr = null; var _ = require('lodash'); var rowSplit = require("./rowSplit"); function Converter(params, options) { Transform.call(this, options); this._options = options || {}; this.param = defParam(params); this.param._options = this._options; // this.resultObject = new Result(this); // this.pipe(this.resultObject); // it is important to have downstream for a transform otherwise it will stuck this.started = false;//indicate if parsing has started. this.recordNum = 0; this.lineNumber = 0; //file line number this._csvLineBuffer = ""; this.lastIndex = 0; // index in result json array //this._pipe(this.lineParser).pipe(this.processor); // this.initNoFork(); if (this.param.forked) { this.param.forked = false; this.workerNum = 2; } this.flushCb = null; this.processEnd = false; this.sequenceBuffer = []; this._needJson = null; this._needEmitResult = null; this._needEmitFinalResult = null; this._needEmitHeader = null; this._needEmitJson = null; this._needPush = null; this._needEmitCsv = null; this._csvTransf = null; this.finalResult = []; // this.on("data", function() {}); this.on("error", emitDone(this)); this.on("end", emitDone(this)); this.initWorker(); process.nextTick(function () { if (this._needEmitFinalResult === null) { this._needEmitFinalResult = this.listeners("end_parsed").length > 0; } if (this._needEmitResult === null) { this._needEmitResult = this.listeners("record_parsed").length > 0; } if (this._needEmitJson === null) { this._needEmitJson = this.listeners("json").length > 0; } if (this._needEmitHeader === null) { this._needEmitHeader = this.listeners("header").length > 0; } if (this._needEmitCsv === null) { this._needEmitCsv = this.listeners("csv").length > 0; } if (this._needJson === null) { this._needJson = this._needEmitJson || this._needEmitFinalResult || this._needEmitResult || this.transform || this._options.objectMode; } if (this._needPush === null) { this._needPush = this.listeners("data").length > 0 || this.listeners("readable").length > 0; // this._needPush=false; } this.param._needParseJson = this._needJson || this._needPush; }.bind(this)); return this; } util.inherits(Converter, Transform); function emitDone(conv) { return function (err) { if (!conv._hasDone) { conv._hasDone = true; process.nextTick(function () { conv.emit('done', err); }); }; } } function bufFromString(str) { var length=Buffer.byteLength(str); var buffer = Buffer.allocUnsafe ? Buffer.allocUnsafe(length) : new Buffer(length); buffer.write(str); return buffer; } Converter.prototype._transform = function (data, encoding, cb) { data=this.prepareData(data); var idx =data.length-1; var left=null; /** * From Keyang: * The code below is to check if a single utf8 char (which could be multiple bytes) being split. * If the char being split, the buffer from two chunk needs to be concat * check how utf8 being encoded to understand the code below. * If anyone has any better way to do this, please let me know. */ if ((data[idx] & 1<<7) !=0){ while ((data[idx] & 3<<6) === 128){ idx--; } idx--; } if (idx !=data.length-1){ left=data.slice(idx+1); data=data.slice(0,idx+1) var _cb=cb; var self=this; cb=function(){ if (self._csvLineBuffer){ self._csvLineBuffer=Buffer.concat([bufFromString(self._csvLineBuffer,"utf8"),left]); }else{ self._csvLineBuffer=left; } _cb(); } } data = data.toString("utf8"); if (this.started === false) { this.started = true; data = stripBom(data); if (this.param.toArrayString) { if (this._needPush) { this.push("[" + eol, "utf8"); } } } var self = this; this.preProcessRaw(data, function (d) { if (d && d.length > 0) { self.processData(d, cb); } else { cb(); } }); }; Converter.prototype.prepareData = function (data) { if (this._csvLineBuffer && this._csvLineBuffer.length>0){ if (typeof this._csvLineBuffer === "string"){ this._csvLineBuffer=bufFromString(this._csvLineBuffer); } return Buffer.concat([this._csvLineBuffer,data]); }else{ return data; } // return this._csvLineBuffer + data; }; Converter.prototype.setPartialData = function (d) { this._csvLineBuffer = d; }; Converter.prototype.processData = function (data, cb) { var params = this.param; if (params.ignoreEmpty && !params._headers) { data = data.replace(/^\s+/, ""); } var eol = this.param.eol; var fileLines = fileline(data, this.param); if (this.param.eol !== eol) { this.emit("eol", this.param.eol); } if (fileLines.lines.length > 0) { if (this.preProcessLine && typeof this.preProcessLine === "function") { fileLines.lines = this._preProcessLines(fileLines.lines, this.lastIndex); } if (!params._headers) { //header is not inited. init header this.processHead(fileLines, cb); } else { if (params.workerNum <= 1) { var lines = fileLineToCSVLine(fileLines, params); this.setPartialData(lines.partial); var jsonArr = linesToJson(lines.lines, params, this.recordNum); this.processResult(jsonArr); this.lastIndex += jsonArr.length; this.recordNum += jsonArr.length; cb(); } else { this.workerProcess(fileLines, cb); } } } else { this.setPartialData(fileLines.partial); cb(); } }; Converter.prototype._preProcessLines = function (lines, startIdx) { var rtn = []; for (var i = 0, len = lines.length; i < len; i++) { var result = this.preProcessLine(lines[i], startIdx + i + 1); if (typeof result === "string") { rtn.push(result); } else { rtn.push(lines[i]); this.emit("error", new Error("preProcessLine should return a string but got: " + JSON.stringify(result))); } } return rtn; }; Converter.prototype.initWorker = function () { var workerNum = this.param.workerNum - 1; if (workerNum > 0) { workerMgr = require("./workerMgr"); this.workerMgr = workerMgr(); this.workerMgr.initWorker(workerNum, this.param); } }; Converter.prototype.preRawData = function (func) { this.preProcessRaw = func; return this; }; Converter.prototype.preFileLine = function (func) { this.preProcessLine = func; return this; }; /** * workerpRocess does not support embeded multiple lines. */ Converter.prototype.workerProcess = function (fileLine, cb) { var self = this; var line = fileLine; var eol = this.getEol(); this.setPartialData(line.partial); this.workerMgr.sendWorker(line.lines.join(eol) + eol, this.lastIndex, cb, function (results, lastIndex) { var buf; var cur = self.sequenceBuffer[0]; if (cur.idx === lastIndex) { cur.result = results; var records = []; while (self.sequenceBuffer[0] && self.sequenceBuffer[0].result) { buf = self.sequenceBuffer.shift(); records = records.concat(buf.result); } self.processResult(records); self.recordNum += records.length; } else { for (var i = 0, len = self.sequenceBuffer.length; i < len; i++) { buf = self.sequenceBuffer[i]; if (buf.idx === lastIndex) { buf.result = results; break; } } } }); this.sequenceBuffer.push({ idx: this.lastIndex, result: null }); this.lastIndex += line.lines.length; }; Converter.prototype.processHead = function (fileLine, cb) { var params = this.param; if (params._headers) { return cb(); } //dirty hack params._needFilterRow = false; // if header is not inited. init header var lines = fileLine.lines; var left = ""; var headerRow = []; if (!params.noheader) { while (lines.length) { var line = left + lines.shift(); var delimiter = params.delimiter; var row = rowSplit(line, params); if (params.delimiter !== delimiter) { this.emit("delimiter", params.delimiter); } if (row.closed) { headerRow = row.cols; left = ""; break; } else { left = line + this.getEol(); } } } params._needFilterRow = true; if (!params.noheader && headerRow.length === 0) { //if one chunk of data does not complete header row. this.setPartialData(left); return cb(); } if (params.noheader) { if (params.headers) { params._headers = params.headers; } else { params._headers = []; } } else { if (params.headers) { params._headers = params.headers; } else { params._headers = headerRow; } } configIgnoreIncludeColumns(params); params._headers = require("./filterRow")(params._headers, params); if (this._needEmitHeader && this.param._headers) { this.emit("header", this.param._headers); } var delimiter = params.delimiter; var lines = fileLineToCSVLine(fileLine, params); if (params.delimiter !== delimiter) { this.emit("delimiter", params.delimiter); } this.setPartialData(lines.partial); if (this.param.workerNum > 1) { this.workerMgr.setParams(params); } var res = linesToJson(lines.lines, params, 0); // Put the header with the first row // if(res.length > 0) res[0].header = params._headers; this.processResult(res); this.lastIndex += res.length; this.recordNum += res.length; cb(); }; function configIgnoreIncludeColumns(params) { if (params._postIgnoreColumns) { for (var i = 0; i < params.ignoreColumns.length; i++) { var ignoreCol = params.ignoreColumns[i]; if (typeof ignoreCol === "string") { var idx = params._headers.indexOf(ignoreCol); if (idx > -1) { params.ignoreColumns[i] = idx; } else { params.ignoreColumns[i] = -1; } } } params.ignoreColumns.sort(function (a, b) { return b - a; }); } if (params._postIncludeColumns) { for (var i = 0; i < params.includeColumns.length; i++) { var includeCol = params.includeColumns[i]; if (typeof includeCol === "string") { var idx = params._headers.indexOf(includeCol); if (idx > -1) { params.includeColumns[i] = idx; } else { params.includeColumns[i] = -1; } } } } params.ignoreColumns = _.uniq(params.ignoreColumns); params.includeColumns = _.uniq(params.includeColumns); } Converter.prototype.processResult = function (result) { for (var i = 0, len = result.length; i < len; i++) { var r = result[i]; if (r.err) { this.emit("error", r.err); } else { this.emitResult(r); } } }; Converter.prototype.emitResult = function (r) { var index = r.index; var header = this.param; var row = r.row; var result = r.json; var resultJson = null; var resultStr = null; if (typeof result === "string") { resultStr = result; } else { resultJson = result; } if (resultJson === null && this._needJson) { resultJson = JSON.parse(resultStr); if (typeof row === "string") { row = JSON.parse(row); } } if (this.transform && typeof this.transform === "function") { this.transform(resultJson, row, index); resultStr = null; } if (this._needEmitJson) { this.emit("json", resultJson, index); } if (this._needEmitCsv) { if (typeof row === "string") { row = JSON.parse(row); } this.emit("csv", row, index); } if (this.param.constructResult && this._needEmitFinalResult) { this.finalResult.push(resultJson); } if (this._needEmitResult) { this.emit("record_parsed", resultJson, row, index); } if (this.param.toArrayString && index > 0 && this._needPush) { this.push("," + eol); } if (this._options && this._options.objectMode) { this.push(resultJson); } else { if (this._needPush) { if (resultStr === null) { resultStr = JSON.stringify(resultJson); } this.push(!this.param.toArrayString ? resultStr + eol : resultStr, "utf8"); } } }; Converter.prototype.preProcessRaw = function (data, cb) { cb(data); }; // FIXME: lineNumber is not used. Converter.prototype.preProcessLine = function (line, lineNumber) { return line; }; Converter.prototype._flush = function (cb) { var self = this; this.flushCb = function () { self.emit("end_parsed", self.finalResult); if (self.workerMgr) { self.workerMgr.destroyWorker(); } cb(); if (!self._needPush) { self.emit("end"); } }; if (this._csvLineBuffer.length > 0) { var eol = this.getEol(); if (this._csvLineBuffer[this._csvLineBuffer.length - 1] !== eol) { this._csvLineBuffer += eol; } this.processData(this._csvLineBuffer, function () { this.checkAndFlush(); }.bind(this)); } else { this.checkAndFlush(); } return; }; Converter.prototype.checkAndFlush = function () { if (this._csvLineBuffer.length !== 0) { this.emit("error", CSVError.unclosed_quote(this.recordNum, this._csvLineBuffer), this._csvLineBuffer); } if (this.param.toArrayString && this._needPush) { this.push(eol + "]", "utf8"); } if (this.workerMgr && this.workerMgr.isRunning()) { this.workerMgr.drain = function () { this.flushCb(); }.bind(this); } else { this.flushCb(); } }; Converter.prototype.getEol = function (data) { if (!this.param.eol && data) { for (var i = 0, len = data.length; i < len; i++) { if (data[i] === "\r") { if (data[i + 1] === "\n") { this.param.eol = "\r\n"; } else { this.param.eol = "\r"; } return this.param.eol; } else if (data[i] === "\n") { this.param.eol = "\n"; return this.param.eol; } } this.param.eol = eol; } return this.param.eol || eol; }; Converter.prototype.fromFile = function (filePath, cb, options) { var fs = require('fs'); var rs = null; if (typeof cb ==="object" && typeof options === "undefined"){ options=cb; cb=null; } this.wrapCallback(cb, function () { if (rs && rs.destroy) { rs.destroy(); } }); fs.exists(filePath, function (exist) { if (exist) { rs = fs.createReadStream(filePath,options); rs.pipe(this); } else { this.emit('error', new Error("File does not exist. Check to make sure the file path to your csv is correct.")); } }.bind(this)); return this; }; Converter.prototype.fromStream = function (readStream, cb) { if (cb && typeof cb === "function") { this.wrapCallback(cb); } readStream.pipe(this); return this; }; Converter.prototype.transf = function (func) { this.transform = func; return this; }; Converter.prototype.fromString = function (csvString, cb) { if (typeof csvString !== "string") { if (cb && typeof cb ==="function"){ return cb(new Error("Passed CSV Data is not a string.")); } } if (cb && typeof cb === "function") { this.wrapCallback(cb, function () { }); } process.nextTick(function () { this.end(csvString); }.bind(this)); return this; }; Converter.prototype.wrapCallback = function (cb, clean) { if (clean === undefined) { clean = function () { }; } if (cb && typeof cb === "function") { this.once("end_parsed", function (res) { if (!this.hasError) { cb(null, res); } }.bind(this)); } this.once("error", function (err) { this.hasError = true; if (cb && typeof cb === "function") { cb(err); } clean(); }.bind(this)); }; module.exports = Converter;