123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- 'use strict';
- var STREAM = require('stream'),
- UTIL = require('util'),
- StringDecoder = require('string_decoder').StringDecoder;
- function MemoryReadableStream(data, options) {
- if (!(this instanceof MemoryReadableStream))
- return new MemoryReadableStream(data, options);
- MemoryReadableStream.super_.call(this, options);
- this.init(data, options);
- }
- UTIL.inherits(MemoryReadableStream, STREAM.Readable);
- function MemoryWritableStream(data, options) {
- if (!(this instanceof MemoryWritableStream))
- return new MemoryWritableStream(data, options);
- MemoryWritableStream.super_.call(this, options);
- this.init(data, options);
- }
- UTIL.inherits(MemoryWritableStream, STREAM.Writable);
- function MemoryDuplexStream(data, options) {
- if (!(this instanceof MemoryDuplexStream))
- return new MemoryDuplexStream(data, options);
- MemoryDuplexStream.super_.call(this, options);
- this.init(data, options);
- }
- UTIL.inherits(MemoryDuplexStream, STREAM.Duplex);
- MemoryReadableStream.prototype.init =
- MemoryWritableStream.prototype.init =
- MemoryDuplexStream.prototype.init = function init (data, options) {
- var self = this;
- this.queue = [];
- if (data) {
- if (!Array.isArray(data)) {
- data = [ data ];
- }
- data.forEach(function (chunk) {
- if (!(chunk instanceof Buffer)) {
- chunk = new Buffer(chunk);
- }
- self.queue.push(chunk);
- });
- }
-
- options = options || {};
-
- this.maxbufsize = options.hasOwnProperty('maxbufsize') ? options.maxbufsize
- : null;
- this.bufoverflow = options.hasOwnProperty('bufoverflow') ? options.bufoverflow
- : null;
- this.frequence = options.hasOwnProperty('frequence') ? options.frequence
- : null;
- };
- function MemoryStream (data, options) {
- if (!(this instanceof MemoryStream))
- return new MemoryStream(data, options);
-
- options = options || {};
-
- var readable = options.hasOwnProperty('readable') ? options.readable : true,
- writable = options.hasOwnProperty('writable') ? options.writable : true;
-
- if (readable && writable) {
- return new MemoryDuplexStream(data, options);
- } else if (readable) {
- return new MemoryReadableStream(data, options);
- } else if (writable) {
- return new MemoryWritableStream(data, options);
- } else {
- throw new Error("Unknown stream type Readable, Writable or Duplex ");
- }
- }
- MemoryStream.createReadStream = function (data, options) {
- options = options || {};
- options.readable = true;
- options.writable = false;
- return new MemoryStream(data, options);
- };
- MemoryStream.createWriteStream = function (data, options) {
- options = options || {};
- options.readable = false;
- options.writable = true;
- return new MemoryStream(data, options);
- };
- MemoryReadableStream.prototype._read =
- MemoryDuplexStream.prototype._read = function _read (n) {
- var self = this,
- frequence = self.frequence || 0,
- wait_data = this instanceof STREAM.Duplex && ! this._writableState.finished ? true : false;
- if ( ! this.queue.length && ! wait_data) {
- this.push(null);// finish stream
- } else if (this.queue.length) {
- setTimeout(function () {
- if (self.queue.length) {
- var chunk = self.queue.shift();
- if (chunk && ! self._readableState.ended) {
- if ( ! self.push(chunk) ) {
- self.queue.unshift(chunk);
- }
- }
- }
- }, frequence);
- }
- };
- MemoryWritableStream.prototype._write =
- MemoryDuplexStream.prototype._write = function _write (chunk, encoding, cb) {
- var decoder = null;
- try {
- decoder = this.decodeStrings && encoding ? new StringDecoder(encoding) : null;
- } catch (err){
- return cb(err);
- }
-
- var decoded_chunk = decoder ? decoder.write(chunk) : chunk,
- queue_size = this._getQueueSize(),
- chunk_size = decoded_chunk.length;
-
- if (this.maxbufsize && (queue_size + chunk_size) > this.maxbufsize ) {
- if (this.bufoverflow) {
- return cb("Buffer overflowed (" + this.bufoverflow + "/" + queue_size + ")");
- } else {
- return cb();
- }
- }
-
- if (this instanceof STREAM.Duplex) {
- while (this.queue.length) {
- this.push(this.queue.shift());
- }
- this.push(decoded_chunk);
- } else {
- this.queue.push(decoded_chunk);
- }
- cb();
- };
- MemoryDuplexStream.prototype.end = function (chunk, encoding, cb) {
- var self = this;
- return MemoryDuplexStream.super_.prototype.end.call(this, chunk, encoding, function () {
- self.push(null);//finish readble stream too
- if (cb) cb();
- });
- };
- MemoryReadableStream.prototype._getQueueSize =
- MemoryWritableStream.prototype._getQueueSize =
- MemoryDuplexStream.prototype._getQueueSize = function () {
- var queuesize = 0, i;
- for (i = 0; i < this.queue.length; i++) {
- queuesize += Array.isArray(this.queue[i]) ? this.queue[i][0].length
- : this.queue[i].length;
- }
- return queuesize;
- };
- MemoryWritableStream.prototype.toString =
- MemoryDuplexStream.prototype.toString =
- MemoryReadableStream.prototype.toString =
- MemoryWritableStream.prototype.getAll =
- MemoryDuplexStream.prototype.getAll =
- MemoryReadableStream.prototype.getAll = function () {
- var self = this,
- ret = '';
- this.queue.forEach(function (data) {
- ret += data;
- });
- return ret;
- };
- MemoryWritableStream.prototype.toBuffer =
- MemoryDuplexStream.prototype.toBuffer =
- MemoryReadableStream.prototype.toBuffer = function () {
- var buffer = new Buffer(this._getQueueSize()),
- currentOffset = 0;
- this.queue.forEach(function (data) {
- var data_buffer = data instanceof Buffer ? data : new Buffer(data);
- data_buffer.copy(buffer, currentOffset);
- currentOffset += data.length;
- });
- return buffer;
- };
- module.exports = MemoryStream;
|