123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- import isPlainObject from 'is-plain-obj';
- import {normalizePipeArguments} from './pipe-arguments.js';
- import {handlePipeArgumentsError} from './throw.js';
- import {waitForBothSubprocesses} from './sequence.js';
- import {pipeSubprocessStream} from './streaming.js';
- import {unpipeOnAbort} from './abort.js';
- // Pipe a subprocess' `stdout`/`stderr`/`stdio` into another subprocess' `stdin`
- export const pipeToSubprocess = (sourceInfo, ...pipeArguments) => {
- if (isPlainObject(pipeArguments[0])) {
- return pipeToSubprocess.bind(undefined, {
- ...sourceInfo,
- boundOptions: {...sourceInfo.boundOptions, ...pipeArguments[0]},
- });
- }
- const {destination, ...normalizedInfo} = normalizePipeArguments(sourceInfo, ...pipeArguments);
- const promise = handlePipePromise({...normalizedInfo, destination});
- promise.pipe = pipeToSubprocess.bind(undefined, {
- ...sourceInfo,
- source: destination,
- sourcePromise: promise,
- boundOptions: {},
- });
- return promise;
- };
- // Asynchronous logic when piping subprocesses
- const handlePipePromise = async ({
- sourcePromise,
- sourceStream,
- sourceOptions,
- sourceError,
- destination,
- destinationStream,
- destinationError,
- unpipeSignal,
- fileDescriptors,
- startTime,
- }) => {
- const subprocessPromises = getSubprocessPromises(sourcePromise, destination);
- handlePipeArgumentsError({
- sourceStream,
- sourceError,
- destinationStream,
- destinationError,
- fileDescriptors,
- sourceOptions,
- startTime,
- });
- const maxListenersController = new AbortController();
- try {
- const mergedStream = pipeSubprocessStream(sourceStream, destinationStream, maxListenersController);
- return await Promise.race([
- waitForBothSubprocesses(subprocessPromises),
- ...unpipeOnAbort(unpipeSignal, {
- sourceStream,
- mergedStream,
- sourceOptions,
- fileDescriptors,
- startTime,
- }),
- ]);
- } finally {
- maxListenersController.abort();
- }
- };
- // `.pipe()` awaits the subprocess promises.
- // When invalid arguments are passed to `.pipe()`, we throw an error, which prevents awaiting them.
- // We need to ensure this does not create unhandled rejections.
- const getSubprocessPromises = (sourcePromise, destination) => Promise.allSettled([sourcePromise, destination]);
|