193 lines
20 KiB
Plaintext
193 lines
20 KiB
Plaintext
"use strict";
|
|
var __extends = (this && this.__extends) || (function () {
|
|
var extendStatics = Object.setPrototypeOf ||
|
|
({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) ||
|
|
function (d, b) { for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p]; };
|
|
return function (d, b) {
|
|
extendStatics(d, b);
|
|
function __() { this.constructor = d; }
|
|
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
|
|
};
|
|
})();
|
|
var __importDefault = (this && this.__importDefault) || function (mod) {
|
|
return (mod && mod.__esModule) ? mod : { "default": mod };
|
|
};
|
|
Object.defineProperty(exports, "__esModule", { value: true });
|
|
var stream_1 = require("stream");
|
|
var Parameters_1 = require("./Parameters");
|
|
var ParseRuntime_1 = require("./ParseRuntime");
|
|
var bluebird_1 = __importDefault(require("bluebird"));
|
|
// import { ProcessorFork } from "./ProcessFork";
|
|
var ProcessorLocal_1 = require("./ProcessorLocal");
|
|
var Result_1 = require("./Result");
|
|
var Converter = /** @class */ (function (_super) {
|
|
__extends(Converter, _super);
|
|
function Converter(param, options) {
|
|
if (options === void 0) { options = {}; }
|
|
var _this = _super.call(this, options) || this;
|
|
_this.options = options;
|
|
_this.params = Parameters_1.mergeParams(param);
|
|
_this.runtime = ParseRuntime_1.initParseRuntime(_this);
|
|
_this.result = new Result_1.Result(_this);
|
|
// if (this.params.fork) {
|
|
// this.processor = new ProcessorFork(this);
|
|
// } else {
|
|
_this.processor = new ProcessorLocal_1.ProcessorLocal(_this);
|
|
// }
|
|
_this.once("error", function (err) {
|
|
// console.log("BBB");
|
|
//wait for next cycle to emit the errors.
|
|
setImmediate(function () {
|
|
_this.result.processError(err);
|
|
_this.emit("done", err);
|
|
});
|
|
});
|
|
_this.once("done", function () {
|
|
_this.processor.destroy();
|
|
});
|
|
return _this;
|
|
}
|
|
Converter.prototype.preRawData = function (onRawData) {
|
|
this.runtime.preRawDataHook = onRawData;
|
|
return this;
|
|
};
|
|
Converter.prototype.preFileLine = function (onFileLine) {
|
|
this.runtime.preFileLineHook = onFileLine;
|
|
return this;
|
|
};
|
|
Converter.prototype.subscribe = function (onNext, onError, onCompleted) {
|
|
this.parseRuntime.subscribe = {
|
|
onNext: onNext,
|
|
onError: onError,
|
|
onCompleted: onCompleted
|
|
};
|
|
return this;
|
|
};
|
|
Converter.prototype.fromFile = function (filePath, options) {
|
|
var _this = this;
|
|
var fs = require("fs");
|
|
// var rs = null;
|
|
// this.wrapCallback(cb, function () {
|
|
// if (rs && rs.destroy) {
|
|
// rs.destroy();
|
|
// }
|
|
// });
|
|
fs.exists(filePath, function (exist) {
|
|
if (exist) {
|
|
var rs = fs.createReadStream(filePath, options);
|
|
rs.pipe(_this);
|
|
}
|
|
else {
|
|
_this.emit('error', new Error("File does not exist. Check to make sure the file path to your csv is correct."));
|
|
}
|
|
});
|
|
return this;
|
|
};
|
|
Converter.prototype.fromStream = function (readStream) {
|
|
readStream.pipe(this);
|
|
return this;
|
|
};
|
|
Converter.prototype.fromString = function (csvString) {
|
|
var csv = csvString.toString();
|
|
var read = new stream_1.Readable();
|
|
var idx = 0;
|
|
read._read = function (size) {
|
|
if (idx >= csvString.length) {
|
|
this.push(null);
|
|
}
|
|
else {
|
|
var str = csvString.substr(idx, size);
|
|
this.push(str);
|
|
idx += size;
|
|
}
|
|
};
|
|
return this.fromStream(read);
|
|
};
|
|
Converter.prototype.then = function (onfulfilled, onrejected) {
|
|
var _this = this;
|
|
return new bluebird_1.default(function (resolve, reject) {
|
|
_this.parseRuntime.then = {
|
|
onfulfilled: function (value) {
|
|
if (onfulfilled) {
|
|
resolve(onfulfilled(value));
|
|
}
|
|
else {
|
|
resolve(value);
|
|
}
|
|
},
|
|
onrejected: function (err) {
|
|
if (onrejected) {
|
|
resolve(onrejected(err));
|
|
}
|
|
else {
|
|
reject(err);
|
|
}
|
|
}
|
|
};
|
|
});
|
|
};
|
|
Object.defineProperty(Converter.prototype, "parseParam", {
|
|
get: function () {
|
|
return this.params;
|
|
},
|
|
enumerable: true,
|
|
configurable: true
|
|
});
|
|
Object.defineProperty(Converter.prototype, "parseRuntime", {
|
|
get: function () {
|
|
return this.runtime;
|
|
},
|
|
enumerable: true,
|
|
configurable: true
|
|
});
|
|
Converter.prototype._transform = function (chunk, encoding, cb) {
|
|
var _this = this;
|
|
this.processor.process(chunk)
|
|
.then(function (result) {
|
|
// console.log(result);
|
|
if (result.length > 0) {
|
|
_this.runtime.started = true;
|
|
return _this.result.processResult(result);
|
|
}
|
|
})
|
|
.then(function () {
|
|
_this.emit("drained");
|
|
cb();
|
|
}, function (error) {
|
|
_this.runtime.hasError = true;
|
|
_this.runtime.error = error;
|
|
_this.emit("error", error);
|
|
cb();
|
|
});
|
|
};
|
|
Converter.prototype._flush = function (cb) {
|
|
var _this = this;
|
|
this.processor.flush()
|
|
.then(function (data) {
|
|
if (data.length > 0) {
|
|
return _this.result.processResult(data);
|
|
}
|
|
})
|
|
.then(function () {
|
|
_this.processEnd(cb);
|
|
}, function (err) {
|
|
_this.emit("error", err);
|
|
cb();
|
|
});
|
|
};
|
|
Converter.prototype.processEnd = function (cb) {
|
|
this.result.endProcess();
|
|
this.emit("done");
|
|
cb();
|
|
};
|
|
Object.defineProperty(Converter.prototype, "parsedLineNumber", {
|
|
get: function () {
|
|
return this.runtime.parsedLineNumber;
|
|
},
|
|
enumerable: true,
|
|
configurable: true
|
|
});
|
|
return Converter;
|
|
}(stream_1.Transform));
|
|
exports.Converter = Converter;
|
|
//# sourceMappingURL=data:application/json;charset=utf-8;base64,{"version":3,"file":"/Users/kxiang/work/projects/csv2json/src/Converter.ts","sources":["/Users/kxiang/work/projects/csv2json/src/Converter.ts"],"names":[],"mappings":";;;;;;;;;;;;;;;AAAA,iCAA+D;AAC/D,2CAA0D;AAC1D,+CAAgE;AAChE,sDAAyB;AAOzB,iDAAiD;AACjD,mDAAkD;AAClD,mCAAkC;AAMlC;IAA+B,6BAAS;IAuFtC,mBAAY,KAA8B,EAAS,OAA8B;QAA9B,wBAAA,EAAA,YAA8B;QAAjF,YACE,kBAAM,OAAO,CAAC,SAuBf;QAxBkD,aAAO,GAAP,OAAO,CAAuB;QAE/E,KAAI,CAAC,MAAM,GAAG,wBAAW,CAAC,KAAK,CAAC,CAAC;QACjC,KAAI,CAAC,OAAO,GAAG,+BAAgB,CAAC,KAAI,CAAC,CAAC;QACtC,KAAI,CAAC,MAAM,GAAG,IAAI,eAAM,CAAC,KAAI,CAAC,CAAC;QAC/B,0BAA0B;QAC1B,8CAA8C;QAC9C,WAAW;QACX,KAAI,CAAC,SAAS,GAAG,IAAI,+BAAc,CAAC,KAAI,CAAC,CAAC;QAC1C,IAAI;QACJ,KAAI,CAAC,IAAI,CAAC,OAAO,EAAE,UAAC,GAAQ;YAC1B,sBAAsB;YACtB,yCAAyC;YACzC,YAAY,CAAC;gBACX,KAAI,CAAC,MAAM,CAAC,YAAY,CAAC,GAAG,CAAC,CAAC;gBAC9B,KAAI,CAAC,IAAI,CAAC,MAAM,EAAE,GAAG,CAAC,CAAC;YACzB,CAAC,CAAC,CAAC;QAEL,CAAC,CAAC,CAAC;QACH,KAAI,CAAC,IAAI,CAAC,MAAM,EAAE;YAChB,KAAI,CAAC,SAAS,CAAC,OAAO,EAAE,CAAC;QAC3B,CAAC,CAAC,CAAA;QAEF,OAAO,KAAI,CAAC;IACd,CAAC;IA9GD,8BAAU,GAAV,UAAW,SAA6B;QACtC,IAAI,CAAC,OAAO,CAAC,cAAc,GAAG,SAAS,CAAC;QACxC,OAAO,IAAI,CAAC;IACd,CAAC;IACD,+BAAW,GAAX,UAAY,UAA+B;QACzC,IAAI,CAAC,OAAO,CAAC,eAAe,GAAG,UAAU,CAAC;QAC1C,OAAO,IAAI,CAAC;IACd,CAAC;IACD,6BAAS,GAAT,UACE,MAAoE,EACpE,OAAiC,EACjC,WAAwB;QACxB,IAAI,CAAC,YAAY,CAAC,SAAS,GAAG;YAC5B,MAAM,QAAA;YACN,OAAO,SAAA;YACP,WAAW,aAAA;SACZ,CAAA;QACD,OAAO,IAAI,CAAC;IACd,CAAC;IACD,4BAAQ,GAAR,UAAS,QAAgB,EAAE,OAAqD;QAAhF,iBAiBC;QAhBC,IAAM,EAAE,GAAG,OAAO,CAAC,IAAI,CAAC,CAAC;QACzB,iBAAiB;QACjB,sCAAsC;QACtC,4BAA4B;QAC5B,oBAAoB;QACpB,MAAM;QACN,MAAM;QACN,EAAE,CAAC,MAAM,CAAC,QAAQ,EAAE,UAAC,KAAK;YACxB,IAAI,KAAK,EAAE;gBACT,IAAM,EAAE,GAAG,EAAE,CAAC,gBAAgB,CAAC,QAAQ,EAAE,OAAO,CAAC,CAAC;gBAClD,EAAE,CAAC,IAAI,CAAC,KAAI,CAAC,CAAC;aACf;iBAAM;gBACL,KAAI,CAAC,IAAI,CAAC,OAAO,EAAE,IAAI,KAAK,CAAC,+EAA+E,CAAC,CAAC,CAAC;aAChH;QACH,CAAC,CAAC,CAAC;QACH,OAAO,IAAI,CAAC;IACd,CAAC;IACD,8BAAU,GAAV,UAAW,UAAoB;QAC7B,UAAU,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC;QACtB,OAAO,IAAI,CAAC;IACd,CAAC;IACD,8BAAU,GAAV,UAAW,SAAiB;QAC1B,IAAM,GAAG,GAAG,SAAS,CAAC,QAAQ,EAAE,CAAC;QACjC,IAAM,IAAI,GAAG,IAAI,iBAAQ,EAAE,CAAC;QAC5B,IAAI,GAAG,GAAG,CAAC,CAAC;QACZ,IAAI,CAAC,KAAK,GAAG,UAAU,IAAI;YACzB,IAAI,GAAG,IAAI,SAAS,CAAC,MAAM,EAAE;gBAC3B,IAAI,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC;aACjB;iBAAM;gBACL,IAAM,GAAG,GAAG,SAAS,CAAC,MAAM,CAAC,GAAG,EAAE,IAAI,CAAC,CAAC;gBACxC,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,CAAC;gBACf,GAAG,IAAI,IAAI,CAAC;aACb;QACH,CAAC,CAAA;QACD,OAAO,IAAI,CAAC,UAAU,CAAC,IAAI,CAAC,CAAC;IAC/B,CAAC;IACD,wBAAI,GAAJ,UAAyC,WAAgE,EAAE,UAA8D;QAAzK,iBAmBC;QAlBC,OAAO,IAAI,kBAAC,CAAC,UAAC,OAAO,EAAE,MAAM;YAC3B,KAAI,CAAC,YAAY,CAAC,IAAI,GAAG;gBACvB,WAAW,EAAE,UAAC,KAAY;oBACxB,IAAI,WAAW,EAAE;wBACf,OAAO,CAAC,WAAW,CAAC,KAAK,CAAC,CAAC,CAAC;qBAC7B;yBAAM;wBACL,OAAO,CAAC,KAAY,CAAC,CAAC;qBACvB;gBACH,CAAC;gBACD,UAAU,EAAE,UAAC,GAAU;oBACrB,IAAI,UAAU,EAAE;wBACd,OAAO,CAAC,UAAU,CAAC,GAAG,CAAC,CAAC,CAAC;qBAC1B;yBAAM;wBACL,MAAM,CAAC,GAAG,CAAC,CAAC;qBACb;gBACH,CAAC;aACF,CAAA;QACH,CAAC,CAAC,CAAC;IACL,CAAC;IACD,sBAAW,iCAAU;aAArB;YACE,OAAO,IAAI,CAAC,MAAM,CAAC;QACrB,CAAC;;;OAAA;IACD,sBAAW,mCAAY;aAAvB;YACE,OAAO,IAAI,CAAC,OAAO,CAAC;QACtB,CAAC;;;OAAA;IA8BD,8BAAU,GAAV,UAAW,KAAU,EAAE,QAAgB,EAAE,EAAY;QAArD,iBAmBC;QAlBC,IAAI,CAAC,SAAS,CAAC,OAAO,CAAC,KAAK,CAAC;aAC1B,IAAI,CAAC,UAAC,MAAM;YACX,uBAAuB;YACvB,IAAI,MAAM,CAAC,MAAM,GAAG,CAAC,EAAE;gBACrB,KAAI,CAAC,OAAO,CAAC,OAAO,GAAG,IAAI,CAAC;gBAE5B,OAAO,KAAI,CAAC,MAAM,CAAC,aAAa,CAAC,MAAM,CAAC,CAAC;aAC1C;QACH,CAAC,CAAC;aACD,IAAI,CAAC;YACJ,KAAI,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC;YACrB,EAAE,EAAE,CAAC;QACP,CAAC,EAAE,UAAC,KAAK;YACP,KAAI,CAAC,OAAO,CAAC,QAAQ,GAAG,IAAI,CAAC;YAC7B,KAAI,CAAC,OAAO,CAAC,KAAK,GAAG,KAAK,CAAC;YAC3B,KAAI,CAAC,IAAI,CAAC,OAAO,EAAE,KAAK,CAAC,CAAC;YAC1B,EAAE,EAAE,CAAC;QACP,CAAC,CAAC,CAAC;IACP,CAAC;IACD,0BAAM,GAAN,UAAO,EAAY;QAAnB,iBAcC;QAbC,IAAI,CAAC,SAAS,CAAC,KAAK,EAAE;aACnB,IAAI,CAAC,UAAC,IAAI;YACT,IAAI,IAAI,CAAC,MAAM,GAAG,CAAC,EAAE;gBAEnB,OAAO,KAAI,CAAC,MAAM,CAAC,aAAa,CAAC,IAAI,CAAC,CAAC;aACxC;QACH,CAAC,CAAC;aACD,IAAI,CAAC;YACJ,KAAI,CAAC,UAAU,CAAC,EAAE,CAAC,CAAC;QACtB,CAAC,EAAE,UAAC,GAAG;YACL,KAAI,CAAC,IAAI,CAAC,OAAO,EAAE,GAAG,CAAC,CAAC;YACxB,EAAE,EAAE,CAAC;QACP,CAAC,CAAC,CAAA;IACN,CAAC;IACO,8BAAU,GAAlB,UAAmB,EAAE;QACnB,IAAI,CAAC,MAAM,CAAC,UAAU,EAAE,CAAC;QACzB,IAAI,CAAC,IAAI,CAAC,MAAM,CAAC,CAAC;QAClB,EAAE,EAAE,CAAC;IACP,CAAC;IACD,sBAAI,uCAAgB;aAApB;YACE,OAAO,IAAI,CAAC,OAAO,CAAC,gBAAgB,CAAC;QACvC,CAAC;;;OAAA;IACH,gBAAC;AAAD,CAAC,AA3JD,CAA+B,kBAAS,GA2JvC;AA3JY,8BAAS","sourcesContent":["import { Transform, TransformOptions, Readable } from \"stream\";\nimport { CSVParseParam, mergeParams } from \"./Parameters\";\nimport { ParseRuntime, initParseRuntime } from \"./ParseRuntime\";\nimport P from \"bluebird\";\nimport { stringToLines } from \"./fileline\";\nimport { map } from \"lodash/map\";\nimport { RowSplit, RowSplitResult } from \"./rowSplit\";\nimport getEol from \"./getEol\";\nimport lineToJson, { JSONResult } from \"./lineToJson\";\nimport { Processor, ProcessLineResult } from \"./Processor\";\n// import { ProcessorFork } from \"./ProcessFork\";\nimport { ProcessorLocal } from \"./ProcessorLocal\";\nimport { Result } from \"./Result\";\nimport CSVError from \"./CSVError\";\nimport { bufFromString } from \"./util\";\n\n\n\nexport class Converter extends Transform implements PromiseLike<any[]> {\n  preRawData(onRawData: PreRawDataCallback): Converter {\n    this.runtime.preRawDataHook = onRawData;\n    return this;\n  }\n  preFileLine(onFileLine: PreFileLineCallback): Converter {\n    this.runtime.preFileLineHook = onFileLine;\n    return this;\n  }\n  subscribe(\n    onNext?: (data: any, lineNumber: number) => void | PromiseLike<void>,\n    onError?: (err: CSVError) => void,\n    onCompleted?: () => void): Converter {\n    this.parseRuntime.subscribe = {\n      onNext,\n      onError,\n      onCompleted\n    }\n    return this;\n  }\n  fromFile(filePath: string, options?: string | CreateReadStreamOption | undefined): Converter {\n    const fs = require(\"fs\");\n    // var rs = null;\n    // this.wrapCallback(cb, function () {\n    //   if (rs && rs.destroy) {\n    //     rs.destroy();\n    //   }\n    // });\n    fs.exists(filePath, (exist) => {\n      if (exist) {\n        const rs = fs.createReadStream(filePath, options);\n        rs.pipe(this);\n      } else {\n        this.emit('error', new Error(\"File does not exist. Check to make sure the file path to your csv is correct.\"));\n      }\n    });\n    return this;\n  }\n  fromStream(readStream: Readable): Converter {\n    readStream.pipe(this);\n    return this;\n  }\n  fromString(csvString: string): Converter {\n    const csv = csvString.toString();\n    const read = new Readable();\n    let idx = 0;\n    read._read = function (size) {\n      if (idx >= csvString.length) {\n        this.push(null);\n      } else {\n        const str = csvString.substr(idx, size);\n        this.push(str);\n        idx += size;\n      }\n    }\n    return this.fromStream(read);\n  }\n  then<TResult1 = any[], TResult2 = never>(onfulfilled?: (value: any[]) => TResult1 | PromiseLike<TResult1>, onrejected?: (reason: any) => TResult2 | PromiseLike<TResult2>): PromiseLike<TResult1 | TResult2> {\n    return new P((resolve, reject) => {\n      this.parseRuntime.then = {\n        onfulfilled: (value: any[]) => {\n          if (onfulfilled) {\n            resolve(onfulfilled(value));\n          } else {\n            resolve(value as any);\n          }\n        },\n        onrejected: (err: Error) => {\n          if (onrejected) {\n            resolve(onrejected(err));\n          } else {\n            reject(err);\n          }\n        }\n      }\n    });\n  }\n  public get parseParam(): CSVParseParam {\n    return this.params;\n  }\n  public get parseRuntime(): ParseRuntime {\n    return this.runtime;\n  }\n  private params: CSVParseParam;\n  private runtime: ParseRuntime;\n  private processor: Processor;\n  private result: Result;\n  constructor(param?: Partial<CSVParseParam>, public options: TransformOptions = {}) {\n    super(options);\n    this.params = mergeParams(param);\n    this.runtime = initParseRuntime(this);\n    this.result = new Result(this);\n    // if (this.params.fork) {\n    //   this.processor = new ProcessorFork(this);\n    // } else {\n    this.processor = new ProcessorLocal(this);\n    // }\n    this.once(\"error\", (err: any) => {\n      // console.log(\"BBB\");\n      //wait for next cycle to emit the errors.\n      setImmediate(() => {\n        this.result.processError(err);\n        this.emit(\"done\", err);\n      });\n\n    });\n    this.once(\"done\", () => {\n      this.processor.destroy();\n    })\n\n    return this;\n  }\n  _transform(chunk: any, encoding: string, cb: Function) {\n    this.processor.process(chunk)\n      .then((result) => {\n        // console.log(result);\n        if (result.length > 0) {\n          this.runtime.started = true;\n\n          return this.result.processResult(result);\n        }\n      })\n      .then(() => {\n        this.emit(\"drained\");\n        cb();\n      }, (error) => {\n        this.runtime.hasError = true;\n        this.runtime.error = error;\n        this.emit(\"error\", error);\n        cb();\n      });\n  }\n  _flush(cb: Function) {\n    this.processor.flush()\n      .then((data) => {\n        if (data.length > 0) {\n\n          return this.result.processResult(data);\n        }\n      })\n      .then(() => {\n        this.processEnd(cb);\n      }, (err) => {\n        this.emit(\"error\", err);\n        cb();\n      })\n  }\n  private processEnd(cb) {\n    this.result.endProcess();\n    this.emit(\"done\");\n    cb();\n  }\n  get parsedLineNumber(): number {\n    return this.runtime.parsedLineNumber;\n  }\n}\nexport interface CreateReadStreamOption {\n  flags?: string;\n  encoding?: string;\n  fd?: number;\n  mode?: number;\n  autoClose?: boolean;\n  start?: number;\n  end?: number;\n  highWaterMark?: number;\n}\nexport type CallBack = (err: Error, data: Array<any>) => void;\n\n\nexport type PreFileLineCallback = (line: string, lineNumber: number) => string | PromiseLike<string>;\nexport type PreRawDataCallback = (csvString: string) => string | PromiseLike<string>;\n"]} |