output-async.js 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import mergeStreams from '@sindresorhus/merge-streams';
  2. import {isStandardStream} from '../utils/standard-stream.js';
  3. import {incrementMaxListeners} from '../utils/max-listeners.js';
  4. import {TRANSFORM_TYPES} from '../stdio/type.js';
  5. import {pipeStreams} from './pipeline.js';
  6. // Handle `input`, `inputFile`, `stdin`, `stdout` and `stderr` options, after spawning, in async mode
  7. // When multiple input streams are used, we merge them to ensure the output stream ends only once each input stream has ended
  8. export const pipeOutputAsync = (subprocess, fileDescriptors, controller) => {
  9. const pipeGroups = new Map();
  10. for (const [fdNumber, {stdioItems, direction}] of Object.entries(fileDescriptors)) {
  11. for (const {stream} of stdioItems.filter(({type}) => TRANSFORM_TYPES.has(type))) {
  12. pipeTransform(subprocess, stream, direction, fdNumber);
  13. }
  14. for (const {stream} of stdioItems.filter(({type}) => !TRANSFORM_TYPES.has(type))) {
  15. pipeStdioItem({
  16. subprocess,
  17. stream,
  18. direction,
  19. fdNumber,
  20. pipeGroups,
  21. controller,
  22. });
  23. }
  24. }
  25. for (const [outputStream, inputStreams] of pipeGroups.entries()) {
  26. const inputStream = inputStreams.length === 1 ? inputStreams[0] : mergeStreams(inputStreams);
  27. pipeStreams(inputStream, outputStream);
  28. }
  29. };
  30. // When using transforms, `subprocess.stdin|stdout|stderr|stdio` is directly mutated
  31. const pipeTransform = (subprocess, stream, direction, fdNumber) => {
  32. if (direction === 'output') {
  33. pipeStreams(subprocess.stdio[fdNumber], stream);
  34. } else {
  35. pipeStreams(stream, subprocess.stdio[fdNumber]);
  36. }
  37. const streamProperty = SUBPROCESS_STREAM_PROPERTIES[fdNumber];
  38. if (streamProperty !== undefined) {
  39. subprocess[streamProperty] = stream;
  40. }
  41. subprocess.stdio[fdNumber] = stream;
  42. };
  43. const SUBPROCESS_STREAM_PROPERTIES = ['stdin', 'stdout', 'stderr'];
  44. // Most `std*` option values involve piping `subprocess.std*` to a stream.
  45. // The stream is either passed by the user or created internally.
  46. const pipeStdioItem = ({subprocess, stream, direction, fdNumber, pipeGroups, controller}) => {
  47. if (stream === undefined) {
  48. return;
  49. }
  50. setStandardStreamMaxListeners(stream, controller);
  51. const [inputStream, outputStream] = direction === 'output'
  52. ? [stream, subprocess.stdio[fdNumber]]
  53. : [subprocess.stdio[fdNumber], stream];
  54. const outputStreams = pipeGroups.get(inputStream) ?? [];
  55. pipeGroups.set(inputStream, [...outputStreams, outputStream]);
  56. };
  57. // Multiple subprocesses might be piping from/to `process.std*` at the same time.
  58. // This is not necessarily an error and should not print a `maxListeners` warning.
  59. const setStandardStreamMaxListeners = (stream, {signal}) => {
  60. if (isStandardStream(stream)) {
  61. incrementMaxListeners(stream, MAX_LISTENERS_INCREMENT, signal);
  62. }
  63. };
  64. // `source.pipe(destination)` adds at most 1 listener for each event.
  65. // If `stdin` option is an array, the values might be combined with `merge-streams`.
  66. // That library also listens for `source` end, which adds 1 more listener.
  67. const MAX_LISTENERS_INCREMENT = 2;