wait-stream.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. import {finished} from 'node:stream/promises';
  2. // Wraps `finished(stream)` to handle the following case:
  3. // - When the subprocess exits, Node.js automatically calls `subprocess.stdin.destroy()`, which we need to ignore.
  4. // - However, we still need to throw if `subprocess.stdin.destroy()` is called before subprocess exit.
  5. export const waitForStream = async (stream, fdNumber, streamInfo, {isSameDirection, stopOnExit = false} = {}) => {
  6. const state = handleStdinDestroy(stream, streamInfo);
  7. const abortController = new AbortController();
  8. try {
  9. await Promise.race([
  10. ...(stopOnExit ? [streamInfo.exitPromise] : []),
  11. finished(stream, {cleanup: true, signal: abortController.signal}),
  12. ]);
  13. } catch (error) {
  14. if (!state.stdinCleanedUp) {
  15. handleStreamError(error, fdNumber, streamInfo, isSameDirection);
  16. }
  17. } finally {
  18. abortController.abort();
  19. }
  20. };
  21. // If `subprocess.stdin` is destroyed before being fully written to, it is considered aborted and should throw an error.
  22. // This can happen for example when user called `subprocess.stdin.destroy()` before `subprocess.stdin.end()`.
  23. // However, Node.js calls `subprocess.stdin.destroy()` on exit for cleanup purposes.
  24. // https://github.com/nodejs/node/blob/0b4cdb4b42956cbd7019058e409e06700a199e11/lib/internal/child_process.js#L278
  25. // This is normal and should not throw an error.
  26. // Therefore, we need to differentiate between both situations to know whether to throw an error.
  27. // Unfortunately, events (`close`, `error`, `end`, `exit`) cannot be used because `.destroy()` can take an arbitrary amount of time.
  28. // For example, `stdin: 'pipe'` is implemented as a TCP socket, and its `.destroy()` method waits for TCP disconnection.
  29. // Therefore `.destroy()` might end before or after subprocess exit, based on OS speed and load.
  30. // The only way to detect this is to spy on `subprocess.stdin._destroy()` by wrapping it.
  31. // If `subprocess.exitCode` or `subprocess.signalCode` is set, it means `.destroy()` is being called by Node.js itself.
  32. const handleStdinDestroy = (stream, {originalStreams: [originalStdin], subprocess}) => {
  33. const state = {stdinCleanedUp: false};
  34. if (stream === originalStdin) {
  35. spyOnStdinDestroy(stream, subprocess, state);
  36. }
  37. return state;
  38. };
  39. const spyOnStdinDestroy = (subprocessStdin, subprocess, state) => {
  40. const {_destroy} = subprocessStdin;
  41. subprocessStdin._destroy = (...destroyArguments) => {
  42. setStdinCleanedUp(subprocess, state);
  43. _destroy.call(subprocessStdin, ...destroyArguments);
  44. };
  45. };
  46. const setStdinCleanedUp = ({exitCode, signalCode}, state) => {
  47. if (exitCode !== null || signalCode !== null) {
  48. state.stdinCleanedUp = true;
  49. }
  50. };
  51. // We ignore EPIPEs on writable streams and aborts on readable streams since those can happen normally.
  52. // When one stream errors, the error is propagated to the other streams on the same file descriptor.
  53. // Those other streams might have a different direction due to the above.
  54. // When this happens, the direction of both the initial stream and the others should then be taken into account.
  55. // Therefore, we keep track of whether a stream error is currently propagating.
  56. const handleStreamError = (error, fdNumber, streamInfo, isSameDirection) => {
  57. if (!shouldIgnoreStreamError(error, fdNumber, streamInfo, isSameDirection)) {
  58. throw error;
  59. }
  60. };
  61. const shouldIgnoreStreamError = (error, fdNumber, streamInfo, isSameDirection = true) => {
  62. if (streamInfo.propagating) {
  63. return isStreamEpipe(error) || isStreamAbort(error);
  64. }
  65. streamInfo.propagating = true;
  66. return isInputFileDescriptor(streamInfo, fdNumber) === isSameDirection
  67. ? isStreamEpipe(error)
  68. : isStreamAbort(error);
  69. };
  70. // Unfortunately, we cannot use the stream's class or properties to know whether it is readable or writable.
  71. // For example, `subprocess.stdin` is technically a Duplex, but can only be used as a writable.
  72. // Therefore, we need to use the file descriptor's direction (`stdin` is input, `stdout` is output, etc.).
  73. // However, while `subprocess.std*` and transforms follow that direction, any stream passed the `std*` option has the opposite direction.
  74. // For example, `subprocess.stdin` is a writable, but the `stdin` option is a readable.
  75. export const isInputFileDescriptor = ({fileDescriptors}, fdNumber) => fdNumber !== 'all' && fileDescriptors[fdNumber].direction === 'input';
  76. // When `stream.destroy()` is called without an `error` argument, stream is aborted.
  77. // This is the only way to abort a readable stream, which can be useful in some instances.
  78. // Therefore, we ignore this error on readable streams.
  79. export const isStreamAbort = error => error?.code === 'ERR_STREAM_PREMATURE_CLOSE';
  80. // When `stream.write()` is called but the underlying source has been closed, `EPIPE` is emitted.
  81. // When piping subprocesses, the source subprocess usually decides when to stop piping.
  82. // However, there are some instances when the destination does instead, such as `... | head -n1`.
  83. // It notifies the source by using `EPIPE`.
  84. // Therefore, we ignore this error on writable streams.
  85. const isStreamEpipe = error => error?.code === 'EPIPE';