incoming.js 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import {once} from 'node:events';
  2. import {scheduler} from 'node:timers/promises';
  3. import {waitForOutgoingMessages} from './outgoing.js';
  4. import {redoAddedReferences} from './reference.js';
  5. import {handleStrictRequest, handleStrictResponse} from './strict.js';
  6. import {handleAbort, abortOnDisconnect} from './graceful.js';
  7. // By default, Node.js buffers `message` events.
  8. // - Buffering happens when there is a `message` event is emitted but there is no handler.
  9. // - As soon as a `message` event handler is set, all buffered `message` events are emitted, emptying the buffer.
  10. // - This happens both in the current process and the subprocess.
  11. // - See https://github.com/nodejs/node/blob/501546e8f37059cd577041e23941b640d0d4d406/lib/internal/child_process.js#L719
  12. // This is helpful. Notably, this allows sending messages to a subprocess that's still initializing.
  13. // However, it has several problems.
  14. // - This works with `events.on()` but not `events.once()` since all buffered messages are emitted at once.
  15. // For example, users cannot call `await getOneMessage()`/`getEachMessage()` multiple times in a row.
  16. // - When a user intentionally starts listening to `message` at a specific point in time, past `message` events are replayed, which might be unexpected.
  17. // - Buffering is unlimited, which might lead to an out-of-memory crash.
  18. // - This does not work well with multiple consumers.
  19. // For example, Execa consumes events with both `result.ipcOutput` and manual IPC calls like `getOneMessage()`.
  20. // Since `result.ipcOutput` reads all incoming messages, no buffering happens for manual IPC calls.
  21. // - Forgetting to setup a `message` listener, or setting it up too late, is a programming mistake.
  22. // The default behavior does not allow users to realize they made that mistake.
  23. // To solve those problems, instead of buffering messages, we debounce them.
  24. // The `message` event so it is emitted at most once per macrotask.
  25. export const onMessage = async ({anyProcess, channel, isSubprocess, ipcEmitter}, wrappedMessage) => {
  26. if (handleStrictResponse(wrappedMessage) || handleAbort(wrappedMessage)) {
  27. return;
  28. }
  29. if (!INCOMING_MESSAGES.has(anyProcess)) {
  30. INCOMING_MESSAGES.set(anyProcess, []);
  31. }
  32. const incomingMessages = INCOMING_MESSAGES.get(anyProcess);
  33. incomingMessages.push(wrappedMessage);
  34. if (incomingMessages.length > 1) {
  35. return;
  36. }
  37. while (incomingMessages.length > 0) {
  38. // eslint-disable-next-line no-await-in-loop
  39. await waitForOutgoingMessages(anyProcess, ipcEmitter, wrappedMessage);
  40. // eslint-disable-next-line no-await-in-loop
  41. await scheduler.yield();
  42. // eslint-disable-next-line no-await-in-loop
  43. const message = await handleStrictRequest({
  44. wrappedMessage: incomingMessages[0],
  45. anyProcess,
  46. channel,
  47. isSubprocess,
  48. ipcEmitter,
  49. });
  50. incomingMessages.shift();
  51. ipcEmitter.emit('message', message);
  52. ipcEmitter.emit('message:done');
  53. }
  54. };
  55. // If the `message` event is currently debounced, the `disconnect` event must wait for it
  56. export const onDisconnect = async ({anyProcess, channel, isSubprocess, ipcEmitter, boundOnMessage}) => {
  57. abortOnDisconnect();
  58. const incomingMessages = INCOMING_MESSAGES.get(anyProcess);
  59. while (incomingMessages?.length > 0) {
  60. // eslint-disable-next-line no-await-in-loop
  61. await once(ipcEmitter, 'message:done');
  62. }
  63. anyProcess.removeListener('message', boundOnMessage);
  64. redoAddedReferences(channel, isSubprocess);
  65. ipcEmitter.connected = false;
  66. ipcEmitter.emit('disconnect');
  67. };
  68. const INCOMING_MESSAGES = new WeakMap();