index.js 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. 'use strict';
  2. var STREAM = require('stream'),
  3. UTIL = require('util'),
  4. StringDecoder = require('string_decoder').StringDecoder;
  5. function MemoryReadableStream(data, options) {
  6. if (!(this instanceof MemoryReadableStream))
  7. return new MemoryReadableStream(data, options);
  8. MemoryReadableStream.super_.call(this, options);
  9. this.init(data, options);
  10. }
  11. UTIL.inherits(MemoryReadableStream, STREAM.Readable);
  12. function MemoryWritableStream(data, options) {
  13. if (!(this instanceof MemoryWritableStream))
  14. return new MemoryWritableStream(data, options);
  15. MemoryWritableStream.super_.call(this, options);
  16. this.init(data, options);
  17. }
  18. UTIL.inherits(MemoryWritableStream, STREAM.Writable);
  19. function MemoryDuplexStream(data, options) {
  20. if (!(this instanceof MemoryDuplexStream))
  21. return new MemoryDuplexStream(data, options);
  22. MemoryDuplexStream.super_.call(this, options);
  23. this.init(data, options);
  24. }
  25. UTIL.inherits(MemoryDuplexStream, STREAM.Duplex);
  26. MemoryReadableStream.prototype.init =
  27. MemoryWritableStream.prototype.init =
  28. MemoryDuplexStream.prototype.init = function init (data, options) {
  29. var self = this;
  30. this.queue = [];
  31. if (data) {
  32. if (!Array.isArray(data)) {
  33. data = [ data ];
  34. }
  35. data.forEach(function (chunk) {
  36. if (!(chunk instanceof Buffer)) {
  37. chunk = new Buffer(chunk);
  38. }
  39. self.queue.push(chunk);
  40. });
  41. }
  42. options = options || {};
  43. this.maxbufsize = options.hasOwnProperty('maxbufsize') ? options.maxbufsize
  44. : null;
  45. this.bufoverflow = options.hasOwnProperty('bufoverflow') ? options.bufoverflow
  46. : null;
  47. this.frequence = options.hasOwnProperty('frequence') ? options.frequence
  48. : null;
  49. };
  50. function MemoryStream (data, options) {
  51. if (!(this instanceof MemoryStream))
  52. return new MemoryStream(data, options);
  53. options = options || {};
  54. var readable = options.hasOwnProperty('readable') ? options.readable : true,
  55. writable = options.hasOwnProperty('writable') ? options.writable : true;
  56. if (readable && writable) {
  57. return new MemoryDuplexStream(data, options);
  58. } else if (readable) {
  59. return new MemoryReadableStream(data, options);
  60. } else if (writable) {
  61. return new MemoryWritableStream(data, options);
  62. } else {
  63. throw new Error("Unknown stream type Readable, Writable or Duplex ");
  64. }
  65. }
  66. MemoryStream.createReadStream = function (data, options) {
  67. options = options || {};
  68. options.readable = true;
  69. options.writable = false;
  70. return new MemoryStream(data, options);
  71. };
  72. MemoryStream.createWriteStream = function (data, options) {
  73. options = options || {};
  74. options.readable = false;
  75. options.writable = true;
  76. return new MemoryStream(data, options);
  77. };
  78. MemoryReadableStream.prototype._read =
  79. MemoryDuplexStream.prototype._read = function _read (n) {
  80. var self = this,
  81. frequence = self.frequence || 0,
  82. wait_data = this instanceof STREAM.Duplex && ! this._writableState.finished ? true : false;
  83. if ( ! this.queue.length && ! wait_data) {
  84. this.push(null);// finish stream
  85. } else if (this.queue.length) {
  86. setTimeout(function () {
  87. if (self.queue.length) {
  88. var chunk = self.queue.shift();
  89. if (chunk && ! self._readableState.ended) {
  90. if ( ! self.push(chunk) ) {
  91. self.queue.unshift(chunk);
  92. }
  93. }
  94. }
  95. }, frequence);
  96. }
  97. };
  98. MemoryWritableStream.prototype._write =
  99. MemoryDuplexStream.prototype._write = function _write (chunk, encoding, cb) {
  100. var decoder = null;
  101. try {
  102. decoder = this.decodeStrings && encoding ? new StringDecoder(encoding) : null;
  103. } catch (err){
  104. return cb(err);
  105. }
  106. var decoded_chunk = decoder ? decoder.write(chunk) : chunk,
  107. queue_size = this._getQueueSize(),
  108. chunk_size = decoded_chunk.length;
  109. if (this.maxbufsize && (queue_size + chunk_size) > this.maxbufsize ) {
  110. if (this.bufoverflow) {
  111. return cb("Buffer overflowed (" + this.bufoverflow + "/" + queue_size + ")");
  112. } else {
  113. return cb();
  114. }
  115. }
  116. if (this instanceof STREAM.Duplex) {
  117. while (this.queue.length) {
  118. this.push(this.queue.shift());
  119. }
  120. this.push(decoded_chunk);
  121. } else {
  122. this.queue.push(decoded_chunk);
  123. }
  124. cb();
  125. };
  126. MemoryDuplexStream.prototype.end = function (chunk, encoding, cb) {
  127. var self = this;
  128. return MemoryDuplexStream.super_.prototype.end.call(this, chunk, encoding, function () {
  129. self.push(null);//finish readble stream too
  130. if (cb) cb();
  131. });
  132. };
  133. MemoryReadableStream.prototype._getQueueSize =
  134. MemoryWritableStream.prototype._getQueueSize =
  135. MemoryDuplexStream.prototype._getQueueSize = function () {
  136. var queuesize = 0, i;
  137. for (i = 0; i < this.queue.length; i++) {
  138. queuesize += Array.isArray(this.queue[i]) ? this.queue[i][0].length
  139. : this.queue[i].length;
  140. }
  141. return queuesize;
  142. };
  143. MemoryWritableStream.prototype.toString =
  144. MemoryDuplexStream.prototype.toString =
  145. MemoryReadableStream.prototype.toString =
  146. MemoryWritableStream.prototype.getAll =
  147. MemoryDuplexStream.prototype.getAll =
  148. MemoryReadableStream.prototype.getAll = function () {
  149. var self = this,
  150. ret = '';
  151. this.queue.forEach(function (data) {
  152. ret += data;
  153. });
  154. return ret;
  155. };
  156. MemoryWritableStream.prototype.toBuffer =
  157. MemoryDuplexStream.prototype.toBuffer =
  158. MemoryReadableStream.prototype.toBuffer = function () {
  159. var buffer = new Buffer(this._getQueueSize()),
  160. currentOffset = 0;
  161. this.queue.forEach(function (data) {
  162. var data_buffer = data instanceof Buffer ? data : new Buffer(data);
  163. data_buffer.copy(buffer, currentOffset);
  164. currentOffset += data.length;
  165. });
  166. return buffer;
  167. };
  168. module.exports = MemoryStream;