get-one.js 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import {once, on} from 'node:events';
  2. import {
  3. validateIpcMethod,
  4. throwOnEarlyDisconnect,
  5. disconnect,
  6. getStrictResponseError,
  7. } from './validation.js';
  8. import {getIpcEmitter, isConnected} from './forward.js';
  9. import {addReference, removeReference} from './reference.js';
  10. // Like `[sub]process.once('message')` but promise-based
  11. export const getOneMessage = ({anyProcess, channel, isSubprocess, ipc}, {reference = true, filter} = {}) => {
  12. validateIpcMethod({
  13. methodName: 'getOneMessage',
  14. isSubprocess,
  15. ipc,
  16. isConnected: isConnected(anyProcess),
  17. });
  18. return getOneMessageAsync({
  19. anyProcess,
  20. channel,
  21. isSubprocess,
  22. filter,
  23. reference,
  24. });
  25. };
  26. const getOneMessageAsync = async ({anyProcess, channel, isSubprocess, filter, reference}) => {
  27. addReference(channel, reference);
  28. const ipcEmitter = getIpcEmitter(anyProcess, channel, isSubprocess);
  29. const controller = new AbortController();
  30. try {
  31. return await Promise.race([
  32. getMessage(ipcEmitter, filter, controller),
  33. throwOnDisconnect(ipcEmitter, isSubprocess, controller),
  34. throwOnStrictError(ipcEmitter, isSubprocess, controller),
  35. ]);
  36. } catch (error) {
  37. disconnect(anyProcess);
  38. throw error;
  39. } finally {
  40. controller.abort();
  41. removeReference(channel, reference);
  42. }
  43. };
  44. const getMessage = async (ipcEmitter, filter, {signal}) => {
  45. if (filter === undefined) {
  46. const [message] = await once(ipcEmitter, 'message', {signal});
  47. return message;
  48. }
  49. for await (const [message] of on(ipcEmitter, 'message', {signal})) {
  50. if (filter(message)) {
  51. return message;
  52. }
  53. }
  54. };
  55. const throwOnDisconnect = async (ipcEmitter, isSubprocess, {signal}) => {
  56. await once(ipcEmitter, 'disconnect', {signal});
  57. throwOnEarlyDisconnect(isSubprocess);
  58. };
  59. const throwOnStrictError = async (ipcEmitter, isSubprocess, {signal}) => {
  60. const [error] = await once(ipcEmitter, 'strict:error', {signal});
  61. throw getStrictResponseError(error, isSubprocess);
  62. };