readable.js 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import {Readable} from 'node:stream';
  2. import {callbackify} from 'node:util';
  3. import {BINARY_ENCODINGS} from '../arguments/encoding-option.js';
  4. import {getFromStream} from '../arguments/fd-options.js';
  5. import {iterateOnSubprocessStream, DEFAULT_OBJECT_HIGH_WATER_MARK} from '../io/iterate.js';
  6. import {createDeferred} from '../utils/deferred.js';
  7. import {addConcurrentStream, waitForConcurrentStreams} from './concurrent.js';
  8. import {
  9. safeWaitForSubprocessStdin,
  10. waitForSubprocessStdout,
  11. waitForSubprocess,
  12. destroyOtherStream,
  13. } from './shared.js';
  14. // Create a `Readable` stream that forwards from `stdout` and awaits the subprocess
  15. export const createReadable = ({subprocess, concurrentStreams, encoding}, {from, binary: binaryOption = true, preserveNewlines = true} = {}) => {
  16. const binary = binaryOption || BINARY_ENCODINGS.has(encoding);
  17. const {subprocessStdout, waitReadableDestroy} = getSubprocessStdout(subprocess, from, concurrentStreams);
  18. const {readableEncoding, readableObjectMode, readableHighWaterMark} = getReadableOptions(subprocessStdout, binary);
  19. const {read, onStdoutDataDone} = getReadableMethods({
  20. subprocessStdout,
  21. subprocess,
  22. binary,
  23. encoding,
  24. preserveNewlines,
  25. });
  26. const readable = new Readable({
  27. read,
  28. destroy: callbackify(onReadableDestroy.bind(undefined, {subprocessStdout, subprocess, waitReadableDestroy})),
  29. highWaterMark: readableHighWaterMark,
  30. objectMode: readableObjectMode,
  31. encoding: readableEncoding,
  32. });
  33. onStdoutFinished({
  34. subprocessStdout,
  35. onStdoutDataDone,
  36. readable,
  37. subprocess,
  38. });
  39. return readable;
  40. };
  41. // Retrieve `stdout` (or other stream depending on `from`)
  42. export const getSubprocessStdout = (subprocess, from, concurrentStreams) => {
  43. const subprocessStdout = getFromStream(subprocess, from);
  44. const waitReadableDestroy = addConcurrentStream(concurrentStreams, subprocessStdout, 'readableDestroy');
  45. return {subprocessStdout, waitReadableDestroy};
  46. };
  47. export const getReadableOptions = ({readableEncoding, readableObjectMode, readableHighWaterMark}, binary) => binary
  48. ? {readableEncoding, readableObjectMode, readableHighWaterMark}
  49. : {readableEncoding, readableObjectMode: true, readableHighWaterMark: DEFAULT_OBJECT_HIGH_WATER_MARK};
  50. export const getReadableMethods = ({subprocessStdout, subprocess, binary, encoding, preserveNewlines}) => {
  51. const onStdoutDataDone = createDeferred();
  52. const onStdoutData = iterateOnSubprocessStream({
  53. subprocessStdout,
  54. subprocess,
  55. binary,
  56. shouldEncode: !binary,
  57. encoding,
  58. preserveNewlines,
  59. });
  60. return {
  61. read() {
  62. onRead(this, onStdoutData, onStdoutDataDone);
  63. },
  64. onStdoutDataDone,
  65. };
  66. };
  67. // Forwards data from `stdout` to `readable`
  68. const onRead = async (readable, onStdoutData, onStdoutDataDone) => {
  69. try {
  70. const {value, done} = await onStdoutData.next();
  71. if (done) {
  72. onStdoutDataDone.resolve();
  73. } else {
  74. readable.push(value);
  75. }
  76. } catch {}
  77. };
  78. // When `subprocess.stdout` ends/aborts/errors, do the same on `readable`.
  79. // Await the subprocess, for the same reason as above.
  80. export const onStdoutFinished = async ({subprocessStdout, onStdoutDataDone, readable, subprocess, subprocessStdin}) => {
  81. try {
  82. await waitForSubprocessStdout(subprocessStdout);
  83. await subprocess;
  84. await safeWaitForSubprocessStdin(subprocessStdin);
  85. await onStdoutDataDone;
  86. if (readable.readable) {
  87. readable.push(null);
  88. }
  89. } catch (error) {
  90. await safeWaitForSubprocessStdin(subprocessStdin);
  91. destroyOtherReadable(readable, error);
  92. }
  93. };
  94. // When `readable` aborts/errors, do the same on `subprocess.stdout`
  95. export const onReadableDestroy = async ({subprocessStdout, subprocess, waitReadableDestroy}, error) => {
  96. if (await waitForConcurrentStreams(waitReadableDestroy, subprocess)) {
  97. destroyOtherReadable(subprocessStdout, error);
  98. await waitForSubprocess(subprocess, error);
  99. }
  100. };
  101. const destroyOtherReadable = (stream, error) => {
  102. destroyOtherStream(stream, stream.readable, error);
  103. };