stream.js 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. import {isReadableStream} from 'is-stream';
  2. import {asyncIterator} from '@sec-ant/readable-stream/ponyfill';
  3. export const getAsyncIterable = stream => {
  4. if (isReadableStream(stream, {checkOpen: false}) && nodeImports.on !== undefined) {
  5. return getStreamIterable(stream);
  6. }
  7. if (typeof stream?.[Symbol.asyncIterator] === 'function') {
  8. return stream;
  9. }
  10. // `ReadableStream[Symbol.asyncIterator]` support is missing in multiple browsers, so we ponyfill it
  11. if (toString.call(stream) === '[object ReadableStream]') {
  12. return asyncIterator.call(stream);
  13. }
  14. throw new TypeError('The first argument must be a Readable, a ReadableStream, or an async iterable.');
  15. };
  16. const {toString} = Object.prototype;
  17. // The default iterable for Node.js streams does not allow for multiple readers at once, so we re-implement it
  18. const getStreamIterable = async function * (stream) {
  19. const controller = new AbortController();
  20. const state = {};
  21. handleStreamEnd(stream, controller, state);
  22. try {
  23. for await (const [chunk] of nodeImports.on(stream, 'data', {signal: controller.signal})) {
  24. yield chunk;
  25. }
  26. } catch (error) {
  27. // Stream failure, for example due to `stream.destroy(error)`
  28. if (state.error !== undefined) {
  29. throw state.error;
  30. // `error` event directly emitted on stream
  31. } else if (!controller.signal.aborted) {
  32. throw error;
  33. // Otherwise, stream completed successfully
  34. }
  35. // The `finally` block also runs when the caller throws, for example due to the `maxBuffer` option
  36. } finally {
  37. stream.destroy();
  38. }
  39. };
  40. const handleStreamEnd = async (stream, controller, state) => {
  41. try {
  42. await nodeImports.finished(stream, {
  43. cleanup: true,
  44. readable: true,
  45. writable: false,
  46. error: false,
  47. });
  48. } catch (error) {
  49. state.error = error;
  50. } finally {
  51. controller.abort();
  52. }
  53. };
  54. // Loaded by the Node entrypoint, but not by the browser one.
  55. // This prevents using dynamic imports.
  56. export const nodeImports = {};