iterate.js 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import {on} from 'node:events';
  2. import {getDefaultHighWaterMark} from 'node:stream';
  3. import {getEncodingTransformGenerator} from '../transform/encoding-transform.js';
  4. import {getSplitLinesGenerator} from '../transform/split.js';
  5. import {transformChunkSync, finalChunksSync} from '../transform/run-sync.js';
  6. // Iterate over lines of `subprocess.stdout`, used by `subprocess.readable|duplex|iterable()`
  7. export const iterateOnSubprocessStream = ({subprocessStdout, subprocess, binary, shouldEncode, encoding, preserveNewlines}) => {
  8. const controller = new AbortController();
  9. stopReadingOnExit(subprocess, controller);
  10. return iterateOnStream({
  11. stream: subprocessStdout,
  12. controller,
  13. binary,
  14. shouldEncode: !subprocessStdout.readableObjectMode && shouldEncode,
  15. encoding,
  16. shouldSplit: !subprocessStdout.readableObjectMode,
  17. preserveNewlines,
  18. });
  19. };
  20. const stopReadingOnExit = async (subprocess, controller) => {
  21. try {
  22. await subprocess;
  23. } catch {} finally {
  24. controller.abort();
  25. }
  26. };
  27. // Iterate over lines of `subprocess.stdout`, used by `result.stdout` and the `verbose: 'full'` option.
  28. // Applies the `lines` and `encoding` options.
  29. export const iterateForResult = ({stream, onStreamEnd, lines, encoding, stripFinalNewline, allMixed}) => {
  30. const controller = new AbortController();
  31. stopReadingOnStreamEnd(onStreamEnd, controller, stream);
  32. const objectMode = stream.readableObjectMode && !allMixed;
  33. return iterateOnStream({
  34. stream,
  35. controller,
  36. binary: encoding === 'buffer',
  37. shouldEncode: !objectMode,
  38. encoding,
  39. shouldSplit: !objectMode && lines,
  40. preserveNewlines: !stripFinalNewline,
  41. });
  42. };
  43. const stopReadingOnStreamEnd = async (onStreamEnd, controller, stream) => {
  44. try {
  45. await onStreamEnd;
  46. } catch {
  47. stream.destroy();
  48. } finally {
  49. controller.abort();
  50. }
  51. };
  52. const iterateOnStream = ({stream, controller, binary, shouldEncode, encoding, shouldSplit, preserveNewlines}) => {
  53. const onStdoutChunk = on(stream, 'data', {
  54. signal: controller.signal,
  55. highWaterMark: HIGH_WATER_MARK,
  56. // Backward compatibility with older name for this option
  57. // See https://github.com/nodejs/node/pull/52080#discussion_r1525227861
  58. // @todo Remove after removing support for Node 21
  59. highWatermark: HIGH_WATER_MARK,
  60. });
  61. return iterateOnData({
  62. onStdoutChunk,
  63. controller,
  64. binary,
  65. shouldEncode,
  66. encoding,
  67. shouldSplit,
  68. preserveNewlines,
  69. });
  70. };
  71. export const DEFAULT_OBJECT_HIGH_WATER_MARK = getDefaultHighWaterMark(true);
  72. // The `highWaterMark` of `events.on()` is measured in number of events, not in bytes.
  73. // Not knowing the average amount of bytes per `data` event, we use the same heuristic as streams in objectMode, since they have the same issue.
  74. // Therefore, we use the value of `getDefaultHighWaterMark(true)`.
  75. // Note: this option does not exist on Node 18, but this is ok since the logic works without it. It just consumes more memory.
  76. const HIGH_WATER_MARK = DEFAULT_OBJECT_HIGH_WATER_MARK;
  77. const iterateOnData = async function * ({onStdoutChunk, controller, binary, shouldEncode, encoding, shouldSplit, preserveNewlines}) {
  78. const generators = getGenerators({
  79. binary,
  80. shouldEncode,
  81. encoding,
  82. shouldSplit,
  83. preserveNewlines,
  84. });
  85. try {
  86. for await (const [chunk] of onStdoutChunk) {
  87. yield * transformChunkSync(chunk, generators, 0);
  88. }
  89. } catch (error) {
  90. if (!controller.signal.aborted) {
  91. throw error;
  92. }
  93. } finally {
  94. yield * finalChunksSync(generators);
  95. }
  96. };
  97. const getGenerators = ({binary, shouldEncode, encoding, shouldSplit, preserveNewlines}) => [
  98. getEncodingTransformGenerator(binary, encoding, !shouldEncode),
  99. getSplitLinesGenerator(binary, preserveNewlines, !shouldSplit, {}),
  100. ].filter(Boolean);