permessage-deflate.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. 'use strict';
  2. const zlib = require('zlib');
  3. const bufferUtil = require('./buffer-util');
  4. const Limiter = require('./limiter');
  5. const { kStatusCode, NOOP } = require('./constants');
  6. const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
  7. const kPerMessageDeflate = Symbol('permessage-deflate');
  8. const kTotalLength = Symbol('total-length');
  9. const kCallback = Symbol('callback');
  10. const kBuffers = Symbol('buffers');
  11. const kError = Symbol('error');
  12. //
  13. // We limit zlib concurrency, which prevents severe memory fragmentation
  14. // as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
  15. // and https://github.com/websockets/ws/issues/1202
  16. //
  17. // Intentionally global; it's the global thread pool that's an issue.
  18. //
  19. let zlibLimiter;
  20. /**
  21. * permessage-deflate implementation.
  22. */
  23. class PerMessageDeflate {
  24. /**
  25. * Creates a PerMessageDeflate instance.
  26. *
  27. * @param {Object} options Configuration options
  28. * @param {Boolean} options.serverNoContextTakeover Request/accept disabling
  29. * of server context takeover
  30. * @param {Boolean} options.clientNoContextTakeover Advertise/acknowledge
  31. * disabling of client context takeover
  32. * @param {(Boolean|Number)} options.serverMaxWindowBits Request/confirm the
  33. * use of a custom server window size
  34. * @param {(Boolean|Number)} options.clientMaxWindowBits Advertise support
  35. * for, or request, a custom client window size
  36. * @param {Object} options.zlibDeflateOptions Options to pass to zlib on deflate
  37. * @param {Object} options.zlibInflateOptions Options to pass to zlib on inflate
  38. * @param {Number} options.threshold Size (in bytes) below which messages
  39. * should not be compressed
  40. * @param {Number} options.concurrencyLimit The number of concurrent calls to
  41. * zlib
  42. * @param {Boolean} isServer Create the instance in either server or client
  43. * mode
  44. * @param {Number} maxPayload The maximum allowed message length
  45. */
  46. constructor(options, isServer, maxPayload) {
  47. this._maxPayload = maxPayload | 0;
  48. this._options = options || {};
  49. this._threshold =
  50. this._options.threshold !== undefined ? this._options.threshold : 1024;
  51. this._isServer = !!isServer;
  52. this._deflate = null;
  53. this._inflate = null;
  54. this.params = null;
  55. if (!zlibLimiter) {
  56. const concurrency =
  57. this._options.concurrencyLimit !== undefined
  58. ? this._options.concurrencyLimit
  59. : 10;
  60. zlibLimiter = new Limiter(concurrency);
  61. }
  62. }
  63. /**
  64. * @type {String}
  65. */
  66. static get extensionName() {
  67. return 'permessage-deflate';
  68. }
  69. /**
  70. * Create an extension negotiation offer.
  71. *
  72. * @return {Object} Extension parameters
  73. * @public
  74. */
  75. offer() {
  76. const params = {};
  77. if (this._options.serverNoContextTakeover) {
  78. params.server_no_context_takeover = true;
  79. }
  80. if (this._options.clientNoContextTakeover) {
  81. params.client_no_context_takeover = true;
  82. }
  83. if (this._options.serverMaxWindowBits) {
  84. params.server_max_window_bits = this._options.serverMaxWindowBits;
  85. }
  86. if (this._options.clientMaxWindowBits) {
  87. params.client_max_window_bits = this._options.clientMaxWindowBits;
  88. } else if (this._options.clientMaxWindowBits == null) {
  89. params.client_max_window_bits = true;
  90. }
  91. return params;
  92. }
  93. /**
  94. * Accept an extension negotiation offer/response.
  95. *
  96. * @param {Array} configurations The extension negotiation offers/reponse
  97. * @return {Object} Accepted configuration
  98. * @public
  99. */
  100. accept(configurations) {
  101. configurations = this.normalizeParams(configurations);
  102. this.params = this._isServer
  103. ? this.acceptAsServer(configurations)
  104. : this.acceptAsClient(configurations);
  105. return this.params;
  106. }
  107. /**
  108. * Releases all resources used by the extension.
  109. *
  110. * @public
  111. */
  112. cleanup() {
  113. if (this._inflate) {
  114. this._inflate.close();
  115. this._inflate = null;
  116. }
  117. if (this._deflate) {
  118. if (this._deflate[kCallback]) {
  119. this._deflate[kCallback]();
  120. }
  121. this._deflate.close();
  122. this._deflate = null;
  123. }
  124. }
  125. /**
  126. * Accept an extension negotiation offer.
  127. *
  128. * @param {Array} offers The extension negotiation offers
  129. * @return {Object} Accepted configuration
  130. * @private
  131. */
  132. acceptAsServer(offers) {
  133. const opts = this._options;
  134. const accepted = offers.find((params) => {
  135. if (
  136. (opts.serverNoContextTakeover === false &&
  137. params.server_no_context_takeover) ||
  138. (params.server_max_window_bits &&
  139. (opts.serverMaxWindowBits === false ||
  140. (typeof opts.serverMaxWindowBits === 'number' &&
  141. opts.serverMaxWindowBits > params.server_max_window_bits))) ||
  142. (typeof opts.clientMaxWindowBits === 'number' &&
  143. !params.client_max_window_bits)
  144. ) {
  145. return false;
  146. }
  147. return true;
  148. });
  149. if (!accepted) {
  150. throw new Error('None of the extension offers can be accepted');
  151. }
  152. if (opts.serverNoContextTakeover) {
  153. accepted.server_no_context_takeover = true;
  154. }
  155. if (opts.clientNoContextTakeover) {
  156. accepted.client_no_context_takeover = true;
  157. }
  158. if (typeof opts.serverMaxWindowBits === 'number') {
  159. accepted.server_max_window_bits = opts.serverMaxWindowBits;
  160. }
  161. if (typeof opts.clientMaxWindowBits === 'number') {
  162. accepted.client_max_window_bits = opts.clientMaxWindowBits;
  163. } else if (
  164. accepted.client_max_window_bits === true ||
  165. opts.clientMaxWindowBits === false
  166. ) {
  167. delete accepted.client_max_window_bits;
  168. }
  169. return accepted;
  170. }
  171. /**
  172. * Accept the extension negotiation response.
  173. *
  174. * @param {Array} response The extension negotiation response
  175. * @return {Object} Accepted configuration
  176. * @private
  177. */
  178. acceptAsClient(response) {
  179. const params = response[0];
  180. if (
  181. this._options.clientNoContextTakeover === false &&
  182. params.client_no_context_takeover
  183. ) {
  184. throw new Error('Unexpected parameter "client_no_context_takeover"');
  185. }
  186. if (!params.client_max_window_bits) {
  187. if (typeof this._options.clientMaxWindowBits === 'number') {
  188. params.client_max_window_bits = this._options.clientMaxWindowBits;
  189. }
  190. } else if (
  191. this._options.clientMaxWindowBits === false ||
  192. (typeof this._options.clientMaxWindowBits === 'number' &&
  193. params.client_max_window_bits > this._options.clientMaxWindowBits)
  194. ) {
  195. throw new Error(
  196. 'Unexpected or invalid parameter "client_max_window_bits"'
  197. );
  198. }
  199. return params;
  200. }
  201. /**
  202. * Normalize parameters.
  203. *
  204. * @param {Array} configurations The extension negotiation offers/reponse
  205. * @return {Array} The offers/response with normalized parameters
  206. * @private
  207. */
  208. normalizeParams(configurations) {
  209. configurations.forEach((params) => {
  210. Object.keys(params).forEach((key) => {
  211. let value = params[key];
  212. if (value.length > 1) {
  213. throw new Error(`Parameter "${key}" must have only a single value`);
  214. }
  215. value = value[0];
  216. if (key === 'client_max_window_bits') {
  217. if (value !== true) {
  218. const num = +value;
  219. if (!Number.isInteger(num) || num < 8 || num > 15) {
  220. throw new TypeError(
  221. `Invalid value for parameter "${key}": ${value}`
  222. );
  223. }
  224. value = num;
  225. } else if (!this._isServer) {
  226. throw new TypeError(
  227. `Invalid value for parameter "${key}": ${value}`
  228. );
  229. }
  230. } else if (key === 'server_max_window_bits') {
  231. const num = +value;
  232. if (!Number.isInteger(num) || num < 8 || num > 15) {
  233. throw new TypeError(
  234. `Invalid value for parameter "${key}": ${value}`
  235. );
  236. }
  237. value = num;
  238. } else if (
  239. key === 'client_no_context_takeover' ||
  240. key === 'server_no_context_takeover'
  241. ) {
  242. if (value !== true) {
  243. throw new TypeError(
  244. `Invalid value for parameter "${key}": ${value}`
  245. );
  246. }
  247. } else {
  248. throw new Error(`Unknown parameter "${key}"`);
  249. }
  250. params[key] = value;
  251. });
  252. });
  253. return configurations;
  254. }
  255. /**
  256. * Decompress data. Concurrency limited.
  257. *
  258. * @param {Buffer} data Compressed data
  259. * @param {Boolean} fin Specifies whether or not this is the last fragment
  260. * @param {Function} callback Callback
  261. * @public
  262. */
  263. decompress(data, fin, callback) {
  264. zlibLimiter.add((done) => {
  265. this._decompress(data, fin, (err, result) => {
  266. done();
  267. callback(err, result);
  268. });
  269. });
  270. }
  271. /**
  272. * Compress data. Concurrency limited.
  273. *
  274. * @param {Buffer} data Data to compress
  275. * @param {Boolean} fin Specifies whether or not this is the last fragment
  276. * @param {Function} callback Callback
  277. * @public
  278. */
  279. compress(data, fin, callback) {
  280. zlibLimiter.add((done) => {
  281. this._compress(data, fin, (err, result) => {
  282. done();
  283. if (err || result) {
  284. callback(err, result);
  285. }
  286. });
  287. });
  288. }
  289. /**
  290. * Decompress data.
  291. *
  292. * @param {Buffer} data Compressed data
  293. * @param {Boolean} fin Specifies whether or not this is the last fragment
  294. * @param {Function} callback Callback
  295. * @private
  296. */
  297. _decompress(data, fin, callback) {
  298. const endpoint = this._isServer ? 'client' : 'server';
  299. if (!this._inflate) {
  300. const key = `${endpoint}_max_window_bits`;
  301. const windowBits =
  302. typeof this.params[key] !== 'number'
  303. ? zlib.Z_DEFAULT_WINDOWBITS
  304. : this.params[key];
  305. this._inflate = zlib.createInflateRaw({
  306. ...this._options.zlibInflateOptions,
  307. windowBits
  308. });
  309. this._inflate[kPerMessageDeflate] = this;
  310. this._inflate[kTotalLength] = 0;
  311. this._inflate[kBuffers] = [];
  312. this._inflate.on('error', inflateOnError);
  313. this._inflate.on('data', inflateOnData);
  314. }
  315. this._inflate[kCallback] = callback;
  316. this._inflate.write(data);
  317. if (fin) this._inflate.write(TRAILER);
  318. this._inflate.flush(() => {
  319. const err = this._inflate[kError];
  320. if (err) {
  321. this._inflate.close();
  322. this._inflate = null;
  323. callback(err);
  324. return;
  325. }
  326. const data = bufferUtil.concat(
  327. this._inflate[kBuffers],
  328. this._inflate[kTotalLength]
  329. );
  330. if (fin && this.params[`${endpoint}_no_context_takeover`]) {
  331. this._inflate.close();
  332. this._inflate = null;
  333. } else {
  334. this._inflate[kTotalLength] = 0;
  335. this._inflate[kBuffers] = [];
  336. }
  337. callback(null, data);
  338. });
  339. }
  340. /**
  341. * Compress data.
  342. *
  343. * @param {Buffer} data Data to compress
  344. * @param {Boolean} fin Specifies whether or not this is the last fragment
  345. * @param {Function} callback Callback
  346. * @private
  347. */
  348. _compress(data, fin, callback) {
  349. const endpoint = this._isServer ? 'server' : 'client';
  350. if (!this._deflate) {
  351. const key = `${endpoint}_max_window_bits`;
  352. const windowBits =
  353. typeof this.params[key] !== 'number'
  354. ? zlib.Z_DEFAULT_WINDOWBITS
  355. : this.params[key];
  356. this._deflate = zlib.createDeflateRaw({
  357. ...this._options.zlibDeflateOptions,
  358. windowBits
  359. });
  360. this._deflate[kTotalLength] = 0;
  361. this._deflate[kBuffers] = [];
  362. //
  363. // An `'error'` event is emitted, only on Node.js < 10.0.0, if the
  364. // `zlib.DeflateRaw` instance is closed while data is being processed.
  365. // This can happen if `PerMessageDeflate#cleanup()` is called at the wrong
  366. // time due to an abnormal WebSocket closure.
  367. //
  368. this._deflate.on('error', NOOP);
  369. this._deflate.on('data', deflateOnData);
  370. }
  371. this._deflate[kCallback] = callback;
  372. this._deflate.write(data);
  373. this._deflate.flush(zlib.Z_SYNC_FLUSH, () => {
  374. if (!this._deflate) {
  375. //
  376. // This `if` statement is only needed for Node.js < 10.0.0 because as of
  377. // commit https://github.com/nodejs/node/commit/5e3f5164, the flush
  378. // callback is no longer called if the deflate stream is closed while
  379. // data is being processed.
  380. //
  381. return;
  382. }
  383. let data = bufferUtil.concat(
  384. this._deflate[kBuffers],
  385. this._deflate[kTotalLength]
  386. );
  387. if (fin) data = data.slice(0, data.length - 4);
  388. //
  389. // Ensure that the callback will not be called again in
  390. // `PerMessageDeflate#cleanup()`.
  391. //
  392. this._deflate[kCallback] = null;
  393. if (fin && this.params[`${endpoint}_no_context_takeover`]) {
  394. this._deflate.close();
  395. this._deflate = null;
  396. } else {
  397. this._deflate[kTotalLength] = 0;
  398. this._deflate[kBuffers] = [];
  399. }
  400. callback(null, data);
  401. });
  402. }
  403. }
  404. module.exports = PerMessageDeflate;
  405. /**
  406. * The listener of the `zlib.DeflateRaw` stream `'data'` event.
  407. *
  408. * @param {Buffer} chunk A chunk of data
  409. * @private
  410. */
  411. function deflateOnData(chunk) {
  412. this[kBuffers].push(chunk);
  413. this[kTotalLength] += chunk.length;
  414. }
  415. /**
  416. * The listener of the `zlib.InflateRaw` stream `'data'` event.
  417. *
  418. * @param {Buffer} chunk A chunk of data
  419. * @private
  420. */
  421. function inflateOnData(chunk) {
  422. this[kTotalLength] += chunk.length;
  423. if (
  424. this[kPerMessageDeflate]._maxPayload < 1 ||
  425. this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload
  426. ) {
  427. this[kBuffers].push(chunk);
  428. return;
  429. }
  430. this[kError] = new RangeError('Max payload size exceeded');
  431. this[kError][kStatusCode] = 1009;
  432. this.removeListener('data', inflateOnData);
  433. this.reset();
  434. }
  435. /**
  436. * The listener of the `zlib.InflateRaw` stream `'error'` event.
  437. *
  438. * @param {Error} err The emitted error
  439. * @private
  440. */
  441. function inflateOnError(err) {
  442. //
  443. // There is no need to call `Zlib#close()` as the handle is automatically
  444. // closed when an error is emitted.
  445. //
  446. this[kPerMessageDeflate]._inflate = null;
  447. err[kStatusCode] = 1007;
  448. this[kCallback](err);
  449. }