pipeline.js 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. import {finished} from 'node:stream/promises';
  2. import {isStandardStream} from '../utils/standard-stream.js';
  3. // Similar to `Stream.pipeline(source, destination)`, but does not destroy standard streams
  4. export const pipeStreams = (source, destination) => {
  5. source.pipe(destination);
  6. onSourceFinish(source, destination);
  7. onDestinationFinish(source, destination);
  8. };
  9. // `source.pipe(destination)` makes `destination` end when `source` ends.
  10. // But it does not propagate aborts or errors. This function does it.
  11. const onSourceFinish = async (source, destination) => {
  12. if (isStandardStream(source) || isStandardStream(destination)) {
  13. return;
  14. }
  15. try {
  16. await finished(source, {cleanup: true, readable: true, writable: false});
  17. } catch {}
  18. endDestinationStream(destination);
  19. };
  20. export const endDestinationStream = destination => {
  21. if (destination.writable) {
  22. destination.end();
  23. }
  24. };
  25. // We do the same thing in the other direction as well.
  26. const onDestinationFinish = async (source, destination) => {
  27. if (isStandardStream(source) || isStandardStream(destination)) {
  28. return;
  29. }
  30. try {
  31. await finished(destination, {cleanup: true, readable: false, writable: true});
  32. } catch {}
  33. abortSourceStream(source);
  34. };
  35. export const abortSourceStream = source => {
  36. if (source.readable) {
  37. source.destroy();
  38. }
  39. };