setup.js 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import isPlainObject from 'is-plain-obj';
  2. import {normalizePipeArguments} from './pipe-arguments.js';
  3. import {handlePipeArgumentsError} from './throw.js';
  4. import {waitForBothSubprocesses} from './sequence.js';
  5. import {pipeSubprocessStream} from './streaming.js';
  6. import {unpipeOnAbort} from './abort.js';
  7. // Pipe a subprocess' `stdout`/`stderr`/`stdio` into another subprocess' `stdin`
  8. export const pipeToSubprocess = (sourceInfo, ...pipeArguments) => {
  9. if (isPlainObject(pipeArguments[0])) {
  10. return pipeToSubprocess.bind(undefined, {
  11. ...sourceInfo,
  12. boundOptions: {...sourceInfo.boundOptions, ...pipeArguments[0]},
  13. });
  14. }
  15. const {destination, ...normalizedInfo} = normalizePipeArguments(sourceInfo, ...pipeArguments);
  16. const promise = handlePipePromise({...normalizedInfo, destination});
  17. promise.pipe = pipeToSubprocess.bind(undefined, {
  18. ...sourceInfo,
  19. source: destination,
  20. sourcePromise: promise,
  21. boundOptions: {},
  22. });
  23. return promise;
  24. };
  25. // Asynchronous logic when piping subprocesses
  26. const handlePipePromise = async ({
  27. sourcePromise,
  28. sourceStream,
  29. sourceOptions,
  30. sourceError,
  31. destination,
  32. destinationStream,
  33. destinationError,
  34. unpipeSignal,
  35. fileDescriptors,
  36. startTime,
  37. }) => {
  38. const subprocessPromises = getSubprocessPromises(sourcePromise, destination);
  39. handlePipeArgumentsError({
  40. sourceStream,
  41. sourceError,
  42. destinationStream,
  43. destinationError,
  44. fileDescriptors,
  45. sourceOptions,
  46. startTime,
  47. });
  48. const maxListenersController = new AbortController();
  49. try {
  50. const mergedStream = pipeSubprocessStream(sourceStream, destinationStream, maxListenersController);
  51. return await Promise.race([
  52. waitForBothSubprocesses(subprocessPromises),
  53. ...unpipeOnAbort(unpipeSignal, {
  54. sourceStream,
  55. mergedStream,
  56. sourceOptions,
  57. fileDescriptors,
  58. startTime,
  59. }),
  60. ]);
  61. } finally {
  62. maxListenersController.abort();
  63. }
  64. };
  65. // `.pipe()` awaits the subprocess promises.
  66. // When invalid arguments are passed to `.pipe()`, we throw an error, which prevents awaiting them.
  67. // We need to ensure this does not create unhandled rejections.
  68. const getSubprocessPromises = (sourcePromise, destination) => Promise.allSettled([sourcePromise, destination]);