wait-subprocess.js 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. import {once} from 'node:events';
  2. import {isStream as isNodeStream} from 'is-stream';
  3. import {throwOnTimeout} from '../terminate/timeout.js';
  4. import {throwOnCancel} from '../terminate/cancel.js';
  5. import {throwOnGracefulCancel} from '../terminate/graceful.js';
  6. import {isStandardStream} from '../utils/standard-stream.js';
  7. import {TRANSFORM_TYPES} from '../stdio/type.js';
  8. import {getBufferedData} from '../io/contents.js';
  9. import {waitForIpcOutput, getBufferedIpcOutput} from '../ipc/buffer-messages.js';
  10. import {sendIpcInput} from '../ipc/ipc-input.js';
  11. import {waitForAllStream} from './all-async.js';
  12. import {waitForStdioStreams} from './stdio.js';
  13. import {waitForExit, waitForSuccessfulExit} from './exit-async.js';
  14. import {waitForStream} from './wait-stream.js';
  15. // Retrieve result of subprocess: exit code, signal, error, streams (stdout/stderr/all)
  16. export const waitForSubprocessResult = async ({
  17. subprocess,
  18. options: {
  19. encoding,
  20. buffer,
  21. maxBuffer,
  22. lines,
  23. timeoutDuration: timeout,
  24. cancelSignal,
  25. gracefulCancel,
  26. forceKillAfterDelay,
  27. stripFinalNewline,
  28. ipc,
  29. ipcInput,
  30. },
  31. context,
  32. verboseInfo,
  33. fileDescriptors,
  34. originalStreams,
  35. onInternalError,
  36. controller,
  37. }) => {
  38. const exitPromise = waitForExit(subprocess, context);
  39. const streamInfo = {
  40. originalStreams,
  41. fileDescriptors,
  42. subprocess,
  43. exitPromise,
  44. propagating: false,
  45. };
  46. const stdioPromises = waitForStdioStreams({
  47. subprocess,
  48. encoding,
  49. buffer,
  50. maxBuffer,
  51. lines,
  52. stripFinalNewline,
  53. verboseInfo,
  54. streamInfo,
  55. });
  56. const allPromise = waitForAllStream({
  57. subprocess,
  58. encoding,
  59. buffer,
  60. maxBuffer,
  61. lines,
  62. stripFinalNewline,
  63. verboseInfo,
  64. streamInfo,
  65. });
  66. const ipcOutput = [];
  67. const ipcOutputPromise = waitForIpcOutput({
  68. subprocess,
  69. buffer,
  70. maxBuffer,
  71. ipc,
  72. ipcOutput,
  73. verboseInfo,
  74. });
  75. const originalPromises = waitForOriginalStreams(originalStreams, subprocess, streamInfo);
  76. const customStreamsEndPromises = waitForCustomStreamsEnd(fileDescriptors, streamInfo);
  77. try {
  78. return await Promise.race([
  79. Promise.all([
  80. {},
  81. waitForSuccessfulExit(exitPromise),
  82. Promise.all(stdioPromises),
  83. allPromise,
  84. ipcOutputPromise,
  85. sendIpcInput(subprocess, ipcInput),
  86. ...originalPromises,
  87. ...customStreamsEndPromises,
  88. ]),
  89. onInternalError,
  90. throwOnSubprocessError(subprocess, controller),
  91. ...throwOnTimeout(subprocess, timeout, context, controller),
  92. ...throwOnCancel({
  93. subprocess,
  94. cancelSignal,
  95. gracefulCancel,
  96. context,
  97. controller,
  98. }),
  99. ...throwOnGracefulCancel({
  100. subprocess,
  101. cancelSignal,
  102. gracefulCancel,
  103. forceKillAfterDelay,
  104. context,
  105. controller,
  106. }),
  107. ]);
  108. } catch (error) {
  109. context.terminationReason ??= 'other';
  110. return Promise.all([
  111. {error},
  112. exitPromise,
  113. Promise.all(stdioPromises.map(stdioPromise => getBufferedData(stdioPromise))),
  114. getBufferedData(allPromise),
  115. getBufferedIpcOutput(ipcOutputPromise, ipcOutput),
  116. Promise.allSettled(originalPromises),
  117. Promise.allSettled(customStreamsEndPromises),
  118. ]);
  119. }
  120. };
  121. // Transforms replace `subprocess.std*`, which means they are not exposed to users.
  122. // However, we still want to wait for their completion.
  123. const waitForOriginalStreams = (originalStreams, subprocess, streamInfo) =>
  124. originalStreams.map((stream, fdNumber) => stream === subprocess.stdio[fdNumber]
  125. ? undefined
  126. : waitForStream(stream, fdNumber, streamInfo));
  127. // Some `stdin`/`stdout`/`stderr` options create a stream, e.g. when passing a file path.
  128. // The `.pipe()` method automatically ends that stream when `subprocess` ends.
  129. // This makes sure we wait for the completion of those streams, in order to catch any error.
  130. const waitForCustomStreamsEnd = (fileDescriptors, streamInfo) => fileDescriptors.flatMap(({stdioItems}, fdNumber) => stdioItems
  131. .filter(({value, stream = value}) => isNodeStream(stream, {checkOpen: false}) && !isStandardStream(stream))
  132. .map(({type, value, stream = value}) => waitForStream(stream, fdNumber, streamInfo, {
  133. isSameDirection: TRANSFORM_TYPES.has(type),
  134. stopOnExit: type === 'native',
  135. })));
  136. // Fails when the subprocess emits an `error` event
  137. const throwOnSubprocessError = async (subprocess, {signal}) => {
  138. const [error] = await once(subprocess, 'error', {signal});
  139. throw error;
  140. };