writable.js 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. import {Writable} from 'node:stream';
  2. import {callbackify} from 'node:util';
  3. import {getToStream} from '../arguments/fd-options.js';
  4. import {addConcurrentStream, waitForConcurrentStreams} from './concurrent.js';
  5. import {
  6. safeWaitForSubprocessStdout,
  7. waitForSubprocessStdin,
  8. waitForSubprocess,
  9. destroyOtherStream,
  10. } from './shared.js';
  11. // Create a `Writable` stream that forwards to `stdin` and awaits the subprocess
  12. export const createWritable = ({subprocess, concurrentStreams}, {to} = {}) => {
  13. const {subprocessStdin, waitWritableFinal, waitWritableDestroy} = getSubprocessStdin(subprocess, to, concurrentStreams);
  14. const writable = new Writable({
  15. ...getWritableMethods(subprocessStdin, subprocess, waitWritableFinal),
  16. destroy: callbackify(onWritableDestroy.bind(undefined, {
  17. subprocessStdin,
  18. subprocess,
  19. waitWritableFinal,
  20. waitWritableDestroy,
  21. })),
  22. highWaterMark: subprocessStdin.writableHighWaterMark,
  23. objectMode: subprocessStdin.writableObjectMode,
  24. });
  25. onStdinFinished(subprocessStdin, writable);
  26. return writable;
  27. };
  28. // Retrieve `stdin` (or other stream depending on `to`)
  29. export const getSubprocessStdin = (subprocess, to, concurrentStreams) => {
  30. const subprocessStdin = getToStream(subprocess, to);
  31. const waitWritableFinal = addConcurrentStream(concurrentStreams, subprocessStdin, 'writableFinal');
  32. const waitWritableDestroy = addConcurrentStream(concurrentStreams, subprocessStdin, 'writableDestroy');
  33. return {subprocessStdin, waitWritableFinal, waitWritableDestroy};
  34. };
  35. export const getWritableMethods = (subprocessStdin, subprocess, waitWritableFinal) => ({
  36. write: onWrite.bind(undefined, subprocessStdin),
  37. final: callbackify(onWritableFinal.bind(undefined, subprocessStdin, subprocess, waitWritableFinal)),
  38. });
  39. // Forwards data from `writable` to `stdin`
  40. const onWrite = (subprocessStdin, chunk, encoding, done) => {
  41. if (subprocessStdin.write(chunk, encoding)) {
  42. done();
  43. } else {
  44. subprocessStdin.once('drain', done);
  45. }
  46. };
  47. // Ensures that the writable `final` and readable `end` events awaits the subprocess.
  48. // Like this, any subprocess failure is propagated as a stream `error` event, instead of being lost.
  49. // The user does not need to `await` the subprocess anymore, but now needs to await the stream completion or error.
  50. // When multiple writables are targeting the same stream, they wait for each other, unless the subprocess ends first.
  51. const onWritableFinal = async (subprocessStdin, subprocess, waitWritableFinal) => {
  52. if (await waitForConcurrentStreams(waitWritableFinal, subprocess)) {
  53. if (subprocessStdin.writable) {
  54. subprocessStdin.end();
  55. }
  56. await subprocess;
  57. }
  58. };
  59. // When `subprocess.stdin` ends/aborts/errors, do the same on `writable`.
  60. export const onStdinFinished = async (subprocessStdin, writable, subprocessStdout) => {
  61. try {
  62. await waitForSubprocessStdin(subprocessStdin);
  63. if (writable.writable) {
  64. writable.end();
  65. }
  66. } catch (error) {
  67. await safeWaitForSubprocessStdout(subprocessStdout);
  68. destroyOtherWritable(writable, error);
  69. }
  70. };
  71. // When `writable` aborts/errors, do the same on `subprocess.stdin`
  72. export const onWritableDestroy = async ({subprocessStdin, subprocess, waitWritableFinal, waitWritableDestroy}, error) => {
  73. await waitForConcurrentStreams(waitWritableFinal, subprocess);
  74. if (await waitForConcurrentStreams(waitWritableDestroy, subprocess)) {
  75. destroyOtherWritable(subprocessStdin, error);
  76. await waitForSubprocess(subprocess, error);
  77. }
  78. };
  79. const destroyOtherWritable = (stream, error) => {
  80. destroyOtherStream(stream, stream.writable, error);
  81. };