run-async.js 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import {callbackify} from 'node:util';
  2. // Applies a series of generator functions asynchronously
  3. export const pushChunks = callbackify(async (getChunks, state, getChunksArguments, transformStream) => {
  4. state.currentIterable = getChunks(...getChunksArguments);
  5. try {
  6. for await (const chunk of state.currentIterable) {
  7. transformStream.push(chunk);
  8. }
  9. } finally {
  10. delete state.currentIterable;
  11. }
  12. });
  13. // For each new chunk, apply each `transform()` method
  14. export const transformChunk = async function * (chunk, generators, index) {
  15. if (index === generators.length) {
  16. yield chunk;
  17. return;
  18. }
  19. const {transform = identityGenerator} = generators[index];
  20. for await (const transformedChunk of transform(chunk)) {
  21. yield * transformChunk(transformedChunk, generators, index + 1);
  22. }
  23. };
  24. // At the end, apply each `final()` method, followed by the `transform()` method of the next transforms
  25. export const finalChunks = async function * (generators) {
  26. for (const [index, {final}] of Object.entries(generators)) {
  27. yield * generatorFinalChunks(final, Number(index), generators);
  28. }
  29. };
  30. const generatorFinalChunks = async function * (final, index, generators) {
  31. if (final === undefined) {
  32. return;
  33. }
  34. for await (const finalChunk of final()) {
  35. yield * transformChunk(finalChunk, generators, index + 1);
  36. }
  37. };
  38. // Cancel any ongoing async generator when the Transform is destroyed, e.g. when the subprocess errors
  39. export const destroyTransform = callbackify(async ({currentIterable}, error) => {
  40. if (currentIterable !== undefined) {
  41. await (error ? currentIterable.throw(error) : currentIterable.return());
  42. return;
  43. }
  44. if (error) {
  45. throw error;
  46. }
  47. });
  48. const identityGenerator = function * (chunk) {
  49. yield chunk;
  50. };