queue.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. 'use strict';
  2. Object.defineProperty(exports, "__esModule", {
  3. value: true
  4. });
  5. exports.default = queue;
  6. var _baseIndexOf = require('lodash/_baseIndexOf');
  7. var _baseIndexOf2 = _interopRequireDefault(_baseIndexOf);
  8. var _isArray = require('lodash/isArray');
  9. var _isArray2 = _interopRequireDefault(_isArray);
  10. var _noop = require('lodash/noop');
  11. var _noop2 = _interopRequireDefault(_noop);
  12. var _rest = require('./rest');
  13. var _rest2 = _interopRequireDefault(_rest);
  14. var _onlyOnce = require('./onlyOnce');
  15. var _onlyOnce2 = _interopRequireDefault(_onlyOnce);
  16. var _setImmediate = require('./setImmediate');
  17. var _setImmediate2 = _interopRequireDefault(_setImmediate);
  18. var _DoublyLinkedList = require('./DoublyLinkedList');
  19. var _DoublyLinkedList2 = _interopRequireDefault(_DoublyLinkedList);
  20. function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
  21. function queue(worker, concurrency, payload) {
  22. if (concurrency == null) {
  23. concurrency = 1;
  24. } else if (concurrency === 0) {
  25. throw new Error('Concurrency must not be zero');
  26. }
  27. function _insert(data, insertAtFront, callback) {
  28. if (callback != null && typeof callback !== 'function') {
  29. throw new Error('task callback must be a function');
  30. }
  31. q.started = true;
  32. if (!(0, _isArray2.default)(data)) {
  33. data = [data];
  34. }
  35. if (data.length === 0 && q.idle()) {
  36. // call drain immediately if there are no tasks
  37. return (0, _setImmediate2.default)(function () {
  38. q.drain();
  39. });
  40. }
  41. for (var i = 0, l = data.length; i < l; i++) {
  42. var item = {
  43. data: data[i],
  44. callback: callback || _noop2.default
  45. };
  46. if (insertAtFront) {
  47. q._tasks.unshift(item);
  48. } else {
  49. q._tasks.push(item);
  50. }
  51. }
  52. (0, _setImmediate2.default)(q.process);
  53. }
  54. function _next(tasks) {
  55. return (0, _rest2.default)(function (args) {
  56. workers -= 1;
  57. for (var i = 0, l = tasks.length; i < l; i++) {
  58. var task = tasks[i];
  59. var index = (0, _baseIndexOf2.default)(workersList, task, 0);
  60. if (index >= 0) {
  61. workersList.splice(index);
  62. }
  63. task.callback.apply(task, args);
  64. if (args[0] != null) {
  65. q.error(args[0], task.data);
  66. }
  67. }
  68. if (workers <= q.concurrency - q.buffer) {
  69. q.unsaturated();
  70. }
  71. if (q.idle()) {
  72. q.drain();
  73. }
  74. q.process();
  75. });
  76. }
  77. var workers = 0;
  78. var workersList = [];
  79. var q = {
  80. _tasks: new _DoublyLinkedList2.default(),
  81. concurrency: concurrency,
  82. payload: payload,
  83. saturated: _noop2.default,
  84. unsaturated: _noop2.default,
  85. buffer: concurrency / 4,
  86. empty: _noop2.default,
  87. drain: _noop2.default,
  88. error: _noop2.default,
  89. started: false,
  90. paused: false,
  91. push: function (data, callback) {
  92. _insert(data, false, callback);
  93. },
  94. kill: function () {
  95. q.drain = _noop2.default;
  96. q._tasks.empty();
  97. },
  98. unshift: function (data, callback) {
  99. _insert(data, true, callback);
  100. },
  101. process: function () {
  102. while (!q.paused && workers < q.concurrency && q._tasks.length) {
  103. var tasks = [],
  104. data = [];
  105. var l = q._tasks.length;
  106. if (q.payload) l = Math.min(l, q.payload);
  107. for (var i = 0; i < l; i++) {
  108. var node = q._tasks.shift();
  109. tasks.push(node);
  110. data.push(node.data);
  111. }
  112. if (q._tasks.length === 0) {
  113. q.empty();
  114. }
  115. workers += 1;
  116. workersList.push(tasks[0]);
  117. if (workers === q.concurrency) {
  118. q.saturated();
  119. }
  120. var cb = (0, _onlyOnce2.default)(_next(tasks));
  121. worker(data, cb);
  122. }
  123. },
  124. length: function () {
  125. return q._tasks.length;
  126. },
  127. running: function () {
  128. return workers;
  129. },
  130. workersList: function () {
  131. return workersList;
  132. },
  133. idle: function () {
  134. return q._tasks.length + workers === 0;
  135. },
  136. pause: function () {
  137. q.paused = true;
  138. },
  139. resume: function () {
  140. if (q.paused === false) {
  141. return;
  142. }
  143. q.paused = false;
  144. var resumeCount = Math.min(q.concurrency, q._tasks.length);
  145. // Need to call q.process once per concurrent
  146. // worker to preserve full concurrency after pause
  147. for (var w = 1; w <= resumeCount; w++) {
  148. (0, _setImmediate2.default)(q.process);
  149. }
  150. }
  151. };
  152. return q;
  153. }
  154. module.exports = exports['default'];