diff options
Diffstat (limited to 'lib/internal/streams/end-of-stream.js')
-rw-r--r-- | lib/internal/streams/end-of-stream.js | 61 |
1 files changed, 35 insertions, 26 deletions
diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 318ab4c2e6..efc2441c51 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -98,8 +98,7 @@ function eos(stream, options, callback) { isWritable(stream) === writable ); - let writableFinished = stream.writableFinished || - (wState && wState.finished); + let writableFinished = stream.writableFinished || wState?.finished; const onfinish = () => { writableFinished = true; // Stream should not be destroyed here. If it is that @@ -111,8 +110,7 @@ function eos(stream, options, callback) { if (!readable || readableEnded) callback.call(stream); }; - let readableEnded = stream.readableEnded || - (rState && rState.endEmitted); + let readableEnded = stream.readableEnded || rState?.endEmitted; const onend = () => { readableEnded = true; // Stream should not be destroyed here. If it is that @@ -128,7 +126,17 @@ function eos(stream, options, callback) { callback.call(stream, err); }; + let closed = wState?.closed || rState?.closed; + const onclose = () => { + closed = true; + + const errored = wState?.errored || rState?.errored; + + if (errored && typeof errored !== 'boolean') { + return callback.call(stream, errored); + } + if (readable && !readableEnded) { if (!isReadableEnded(stream)) return callback.call(stream, @@ -139,6 +147,7 @@ function eos(stream, options, callback) { return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); } + callback.call(stream); }; @@ -168,29 +177,29 @@ function eos(stream, options, callback) { if (options.error !== false) stream.on('error', onerror); stream.on('close', onclose); - // _closed is for OutgoingMessage which is not a proper Writable. - const closed = (!wState && !rState && stream._closed === true) || ( - (wState && wState.closed) || - (rState && rState.closed) || - (wState && wState.errorEmitted) || - (rState && rState.errorEmitted) || - (rState && stream.req && stream.aborted) || - ( - (!writable || (wState && wState.finished)) && - (!readable || (rState && rState.endEmitted)) - ) - ); - if (closed) { - // TODO(ronag): Re-throw error if errorEmitted? - // TODO(ronag): Throw premature close as if finished was called? - // before being closed? i.e. if closed but not errored, ended or finished. - // TODO(ronag): Throw some kind of error? Does it make sense - // to call finished() on a "finished" stream? - // TODO(ronag): willEmitClose? - process.nextTick(() => { - callback(); - }); + process.nextTick(onclose); + } else if (wState?.errorEmitted || rState?.errorEmitted) { + if (!willEmitClose) { + process.nextTick(onclose); + } + } else if ( + !readable && + (!willEmitClose || stream.readable) && + writableFinished + ) { + process.nextTick(onclose); + } else if ( + !writable && + (!willEmitClose || stream.writable) && + readableEnded + ) { + process.nextTick(onclose); + } else if (!wState && !rState && stream._closed === true) { + // _closed is for OutgoingMessage which is not a proper Writable. + process.nextTick(onclose); + } else if ((rState && stream.req && stream.aborted)) { + process.nextTick(onclose); } const cleanup = () => { |