get-each.js 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import {once, on} from 'node:events';
  2. import {validateIpcMethod, disconnect, getStrictResponseError} from './validation.js';
  3. import {getIpcEmitter, isConnected} from './forward.js';
  4. import {addReference, removeReference} from './reference.js';
  5. // Like `[sub]process.on('message')` but promise-based
  6. export const getEachMessage = ({anyProcess, channel, isSubprocess, ipc}, {reference = true} = {}) => loopOnMessages({
  7. anyProcess,
  8. channel,
  9. isSubprocess,
  10. ipc,
  11. shouldAwait: !isSubprocess,
  12. reference,
  13. });
  14. // Same but used internally
  15. export const loopOnMessages = ({anyProcess, channel, isSubprocess, ipc, shouldAwait, reference}) => {
  16. validateIpcMethod({
  17. methodName: 'getEachMessage',
  18. isSubprocess,
  19. ipc,
  20. isConnected: isConnected(anyProcess),
  21. });
  22. addReference(channel, reference);
  23. const ipcEmitter = getIpcEmitter(anyProcess, channel, isSubprocess);
  24. const controller = new AbortController();
  25. const state = {};
  26. stopOnDisconnect(anyProcess, ipcEmitter, controller);
  27. abortOnStrictError({
  28. ipcEmitter,
  29. isSubprocess,
  30. controller,
  31. state,
  32. });
  33. return iterateOnMessages({
  34. anyProcess,
  35. channel,
  36. ipcEmitter,
  37. isSubprocess,
  38. shouldAwait,
  39. controller,
  40. state,
  41. reference,
  42. });
  43. };
  44. const stopOnDisconnect = async (anyProcess, ipcEmitter, controller) => {
  45. try {
  46. await once(ipcEmitter, 'disconnect', {signal: controller.signal});
  47. controller.abort();
  48. } catch {}
  49. };
  50. const abortOnStrictError = async ({ipcEmitter, isSubprocess, controller, state}) => {
  51. try {
  52. const [error] = await once(ipcEmitter, 'strict:error', {signal: controller.signal});
  53. state.error = getStrictResponseError(error, isSubprocess);
  54. controller.abort();
  55. } catch {}
  56. };
  57. const iterateOnMessages = async function * ({anyProcess, channel, ipcEmitter, isSubprocess, shouldAwait, controller, state, reference}) {
  58. try {
  59. for await (const [message] of on(ipcEmitter, 'message', {signal: controller.signal})) {
  60. throwIfStrictError(state);
  61. yield message;
  62. }
  63. } catch {
  64. throwIfStrictError(state);
  65. } finally {
  66. controller.abort();
  67. removeReference(channel, reference);
  68. if (!isSubprocess) {
  69. disconnect(anyProcess);
  70. }
  71. if (shouldAwait) {
  72. await anyProcess;
  73. }
  74. }
  75. };
  76. const throwIfStrictError = ({error}) => {
  77. if (error) {
  78. throw error;
  79. }
  80. };