123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- import {Readable} from 'node:stream';
- import {callbackify} from 'node:util';
- import {BINARY_ENCODINGS} from '../arguments/encoding-option.js';
- import {getFromStream} from '../arguments/fd-options.js';
- import {iterateOnSubprocessStream, DEFAULT_OBJECT_HIGH_WATER_MARK} from '../io/iterate.js';
- import {createDeferred} from '../utils/deferred.js';
- import {addConcurrentStream, waitForConcurrentStreams} from './concurrent.js';
- import {
- safeWaitForSubprocessStdin,
- waitForSubprocessStdout,
- waitForSubprocess,
- destroyOtherStream,
- } from './shared.js';
- // Create a `Readable` stream that forwards from `stdout` and awaits the subprocess
- export const createReadable = ({subprocess, concurrentStreams, encoding}, {from, binary: binaryOption = true, preserveNewlines = true} = {}) => {
- const binary = binaryOption || BINARY_ENCODINGS.has(encoding);
- const {subprocessStdout, waitReadableDestroy} = getSubprocessStdout(subprocess, from, concurrentStreams);
- const {readableEncoding, readableObjectMode, readableHighWaterMark} = getReadableOptions(subprocessStdout, binary);
- const {read, onStdoutDataDone} = getReadableMethods({
- subprocessStdout,
- subprocess,
- binary,
- encoding,
- preserveNewlines,
- });
- const readable = new Readable({
- read,
- destroy: callbackify(onReadableDestroy.bind(undefined, {subprocessStdout, subprocess, waitReadableDestroy})),
- highWaterMark: readableHighWaterMark,
- objectMode: readableObjectMode,
- encoding: readableEncoding,
- });
- onStdoutFinished({
- subprocessStdout,
- onStdoutDataDone,
- readable,
- subprocess,
- });
- return readable;
- };
- // Retrieve `stdout` (or other stream depending on `from`)
- export const getSubprocessStdout = (subprocess, from, concurrentStreams) => {
- const subprocessStdout = getFromStream(subprocess, from);
- const waitReadableDestroy = addConcurrentStream(concurrentStreams, subprocessStdout, 'readableDestroy');
- return {subprocessStdout, waitReadableDestroy};
- };
- export const getReadableOptions = ({readableEncoding, readableObjectMode, readableHighWaterMark}, binary) => binary
- ? {readableEncoding, readableObjectMode, readableHighWaterMark}
- : {readableEncoding, readableObjectMode: true, readableHighWaterMark: DEFAULT_OBJECT_HIGH_WATER_MARK};
- export const getReadableMethods = ({subprocessStdout, subprocess, binary, encoding, preserveNewlines}) => {
- const onStdoutDataDone = createDeferred();
- const onStdoutData = iterateOnSubprocessStream({
- subprocessStdout,
- subprocess,
- binary,
- shouldEncode: !binary,
- encoding,
- preserveNewlines,
- });
- return {
- read() {
- onRead(this, onStdoutData, onStdoutDataDone);
- },
- onStdoutDataDone,
- };
- };
- // Forwards data from `stdout` to `readable`
- const onRead = async (readable, onStdoutData, onStdoutDataDone) => {
- try {
- const {value, done} = await onStdoutData.next();
- if (done) {
- onStdoutDataDone.resolve();
- } else {
- readable.push(value);
- }
- } catch {}
- };
- // When `subprocess.stdout` ends/aborts/errors, do the same on `readable`.
- // Await the subprocess, for the same reason as above.
- export const onStdoutFinished = async ({subprocessStdout, onStdoutDataDone, readable, subprocess, subprocessStdin}) => {
- try {
- await waitForSubprocessStdout(subprocessStdout);
- await subprocess;
- await safeWaitForSubprocessStdin(subprocessStdin);
- await onStdoutDataDone;
- if (readable.readable) {
- readable.push(null);
- }
- } catch (error) {
- await safeWaitForSubprocessStdin(subprocessStdin);
- destroyOtherReadable(readable, error);
- }
- };
- // When `readable` aborts/errors, do the same on `subprocess.stdout`
- export const onReadableDestroy = async ({subprocessStdout, subprocess, waitReadableDestroy}, error) => {
- if (await waitForConcurrentStreams(waitReadableDestroy, subprocess)) {
- destroyOtherReadable(subprocessStdout, error);
- await waitForSubprocess(subprocess, error);
- }
- };
- const destroyOtherReadable = (stream, error) => {
- destroyOtherStream(stream, stream.readable, error);
- };
|