duplex.js 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import {Duplex} from 'node:stream';
  2. import {callbackify} from 'node:util';
  3. import {BINARY_ENCODINGS} from '../arguments/encoding-option.js';
  4. import {
  5. getSubprocessStdout,
  6. getReadableOptions,
  7. getReadableMethods,
  8. onStdoutFinished,
  9. onReadableDestroy,
  10. } from './readable.js';
  11. import {
  12. getSubprocessStdin,
  13. getWritableMethods,
  14. onStdinFinished,
  15. onWritableDestroy,
  16. } from './writable.js';
  17. // Create a `Duplex` stream combining both `subprocess.readable()` and `subprocess.writable()`
  18. export const createDuplex = ({subprocess, concurrentStreams, encoding}, {from, to, binary: binaryOption = true, preserveNewlines = true} = {}) => {
  19. const binary = binaryOption || BINARY_ENCODINGS.has(encoding);
  20. const {subprocessStdout, waitReadableDestroy} = getSubprocessStdout(subprocess, from, concurrentStreams);
  21. const {subprocessStdin, waitWritableFinal, waitWritableDestroy} = getSubprocessStdin(subprocess, to, concurrentStreams);
  22. const {readableEncoding, readableObjectMode, readableHighWaterMark} = getReadableOptions(subprocessStdout, binary);
  23. const {read, onStdoutDataDone} = getReadableMethods({
  24. subprocessStdout,
  25. subprocess,
  26. binary,
  27. encoding,
  28. preserveNewlines,
  29. });
  30. const duplex = new Duplex({
  31. read,
  32. ...getWritableMethods(subprocessStdin, subprocess, waitWritableFinal),
  33. destroy: callbackify(onDuplexDestroy.bind(undefined, {
  34. subprocessStdout,
  35. subprocessStdin,
  36. subprocess,
  37. waitReadableDestroy,
  38. waitWritableFinal,
  39. waitWritableDestroy,
  40. })),
  41. readableHighWaterMark,
  42. writableHighWaterMark: subprocessStdin.writableHighWaterMark,
  43. readableObjectMode,
  44. writableObjectMode: subprocessStdin.writableObjectMode,
  45. encoding: readableEncoding,
  46. });
  47. onStdoutFinished({
  48. subprocessStdout,
  49. onStdoutDataDone,
  50. readable: duplex,
  51. subprocess,
  52. subprocessStdin,
  53. });
  54. onStdinFinished(subprocessStdin, duplex, subprocessStdout);
  55. return duplex;
  56. };
  57. const onDuplexDestroy = async ({subprocessStdout, subprocessStdin, subprocess, waitReadableDestroy, waitWritableFinal, waitWritableDestroy}, error) => {
  58. await Promise.all([
  59. onReadableDestroy({subprocessStdout, subprocess, waitReadableDestroy}, error),
  60. onWritableDestroy({
  61. subprocessStdin,
  62. subprocess,
  63. waitWritableFinal,
  64. waitWritableDestroy,
  65. }, error),
  66. ]);
  67. };