strict.js 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import {once} from 'node:events';
  2. import {createDeferred} from '../utils/deferred.js';
  3. import {incrementMaxListeners} from '../utils/max-listeners.js';
  4. import {sendMessage} from './send.js';
  5. import {throwOnMissingStrict, throwOnStrictDisconnect, throwOnStrictDeadlockError} from './validation.js';
  6. import {getIpcEmitter} from './forward.js';
  7. import {hasMessageListeners} from './outgoing.js';
  8. // When using the `strict` option, wrap the message with metadata during `sendMessage()`
  9. export const handleSendStrict = ({anyProcess, channel, isSubprocess, message, strict}) => {
  10. if (!strict) {
  11. return message;
  12. }
  13. const ipcEmitter = getIpcEmitter(anyProcess, channel, isSubprocess);
  14. const hasListeners = hasMessageListeners(anyProcess, ipcEmitter);
  15. return {
  16. id: count++,
  17. type: REQUEST_TYPE,
  18. message,
  19. hasListeners,
  20. };
  21. };
  22. let count = 0n;
  23. // Handles when both processes are calling `sendMessage()` with `strict` at the same time.
  24. // If neither process is listening, this would create a deadlock. We detect it and throw.
  25. export const validateStrictDeadlock = (outgoingMessages, wrappedMessage) => {
  26. if (wrappedMessage?.type !== REQUEST_TYPE || wrappedMessage.hasListeners) {
  27. return;
  28. }
  29. for (const {id} of outgoingMessages) {
  30. if (id !== undefined) {
  31. STRICT_RESPONSES[id].resolve({isDeadlock: true, hasListeners: false});
  32. }
  33. }
  34. };
  35. // The other process then sends the acknowledgment back as a response
  36. export const handleStrictRequest = async ({wrappedMessage, anyProcess, channel, isSubprocess, ipcEmitter}) => {
  37. if (wrappedMessage?.type !== REQUEST_TYPE || !anyProcess.connected) {
  38. return wrappedMessage;
  39. }
  40. const {id, message} = wrappedMessage;
  41. const response = {id, type: RESPONSE_TYPE, message: hasMessageListeners(anyProcess, ipcEmitter)};
  42. try {
  43. await sendMessage({
  44. anyProcess,
  45. channel,
  46. isSubprocess,
  47. ipc: true,
  48. }, response);
  49. } catch (error) {
  50. ipcEmitter.emit('strict:error', error);
  51. }
  52. return message;
  53. };
  54. // Reception of the acknowledgment response
  55. export const handleStrictResponse = wrappedMessage => {
  56. if (wrappedMessage?.type !== RESPONSE_TYPE) {
  57. return false;
  58. }
  59. const {id, message: hasListeners} = wrappedMessage;
  60. STRICT_RESPONSES[id]?.resolve({isDeadlock: false, hasListeners});
  61. return true;
  62. };
  63. // Wait for the other process to receive the message from `sendMessage()`
  64. export const waitForStrictResponse = async (wrappedMessage, anyProcess, isSubprocess) => {
  65. if (wrappedMessage?.type !== REQUEST_TYPE) {
  66. return;
  67. }
  68. const deferred = createDeferred();
  69. STRICT_RESPONSES[wrappedMessage.id] = deferred;
  70. const controller = new AbortController();
  71. try {
  72. const {isDeadlock, hasListeners} = await Promise.race([
  73. deferred,
  74. throwOnDisconnect(anyProcess, isSubprocess, controller),
  75. ]);
  76. if (isDeadlock) {
  77. throwOnStrictDeadlockError(isSubprocess);
  78. }
  79. if (!hasListeners) {
  80. throwOnMissingStrict(isSubprocess);
  81. }
  82. } finally {
  83. controller.abort();
  84. delete STRICT_RESPONSES[wrappedMessage.id];
  85. }
  86. };
  87. const STRICT_RESPONSES = {};
  88. const throwOnDisconnect = async (anyProcess, isSubprocess, {signal}) => {
  89. incrementMaxListeners(anyProcess, 1, signal);
  90. await once(anyProcess, 'disconnect', {signal});
  91. throwOnStrictDisconnect(isSubprocess);
  92. };
  93. const REQUEST_TYPE = 'execa:ipc:request';
  94. const RESPONSE_TYPE = 'execa:ipc:response';