generator.js 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import {Transform, getDefaultHighWaterMark} from 'node:stream';
  2. import {isAsyncGenerator} from '../stdio/type.js';
  3. import {getSplitLinesGenerator, getAppendNewlineGenerator} from './split.js';
  4. import {getValidateTransformInput, getValidateTransformReturn} from './validate.js';
  5. import {getEncodingTransformGenerator} from './encoding-transform.js';
  6. import {
  7. pushChunks,
  8. transformChunk,
  9. finalChunks,
  10. destroyTransform,
  11. } from './run-async.js';
  12. import {
  13. pushChunksSync,
  14. transformChunkSync,
  15. finalChunksSync,
  16. runTransformSync,
  17. } from './run-sync.js';
  18. /*
  19. Generators can be used to transform/filter standard streams.
  20. Generators have a simple syntax, yet allows all of the following:
  21. - Sharing `state` between chunks
  22. - Flushing logic, by using a `final` function
  23. - Asynchronous logic
  24. - Emitting multiple chunks from a single source chunk, even if spaced in time, by using multiple `yield`
  25. - Filtering, by using no `yield`
  26. Therefore, there is no need to allow Node.js or web transform streams.
  27. The `highWaterMark` is kept as the default value, since this is what `subprocess.std*` uses.
  28. Chunks are currently processed serially. We could add a `concurrency` option to parallelize in the future.
  29. Transform an array of generator functions into a `Transform` stream.
  30. `Duplex.from(generator)` cannot be used because it does not allow setting the `objectMode` and `highWaterMark`.
  31. */
  32. export const generatorToStream = ({
  33. value,
  34. value: {transform, final, writableObjectMode, readableObjectMode},
  35. optionName,
  36. }, {encoding}) => {
  37. const state = {};
  38. const generators = addInternalGenerators(value, encoding, optionName);
  39. const transformAsync = isAsyncGenerator(transform);
  40. const finalAsync = isAsyncGenerator(final);
  41. const transformMethod = transformAsync
  42. ? pushChunks.bind(undefined, transformChunk, state)
  43. : pushChunksSync.bind(undefined, transformChunkSync);
  44. const finalMethod = transformAsync || finalAsync
  45. ? pushChunks.bind(undefined, finalChunks, state)
  46. : pushChunksSync.bind(undefined, finalChunksSync);
  47. const destroyMethod = transformAsync || finalAsync
  48. ? destroyTransform.bind(undefined, state)
  49. : undefined;
  50. const stream = new Transform({
  51. writableObjectMode,
  52. writableHighWaterMark: getDefaultHighWaterMark(writableObjectMode),
  53. readableObjectMode,
  54. readableHighWaterMark: getDefaultHighWaterMark(readableObjectMode),
  55. transform(chunk, encoding, done) {
  56. transformMethod([chunk, generators, 0], this, done);
  57. },
  58. flush(done) {
  59. finalMethod([generators], this, done);
  60. },
  61. destroy: destroyMethod,
  62. });
  63. return {stream};
  64. };
  65. // Applies transform generators in sync mode
  66. export const runGeneratorsSync = (chunks, stdioItems, encoding, isInput) => {
  67. const generators = stdioItems.filter(({type}) => type === 'generator');
  68. const reversedGenerators = isInput ? generators.reverse() : generators;
  69. for (const {value, optionName} of reversedGenerators) {
  70. const generators = addInternalGenerators(value, encoding, optionName);
  71. chunks = runTransformSync(generators, chunks);
  72. }
  73. return chunks;
  74. };
  75. // Generators used internally to convert the chunk type, validate it, and split into lines
  76. const addInternalGenerators = (
  77. {transform, final, binary, writableObjectMode, readableObjectMode, preserveNewlines},
  78. encoding,
  79. optionName,
  80. ) => {
  81. const state = {};
  82. return [
  83. {transform: getValidateTransformInput(writableObjectMode, optionName)},
  84. getEncodingTransformGenerator(binary, encoding, writableObjectMode),
  85. getSplitLinesGenerator(binary, preserveNewlines, writableObjectMode, state),
  86. {transform, final},
  87. {transform: getValidateTransformReturn(readableObjectMode, optionName)},
  88. getAppendNewlineGenerator({
  89. binary,
  90. preserveNewlines,
  91. readableObjectMode,
  92. state,
  93. }),
  94. ].filter(Boolean);
  95. };