Skip to content

Commit

Permalink
stream: make pipeline try to wait for 'close'
Browse files Browse the repository at this point in the history
Pipeline uses eos which will invoke the callback
on 'finish' and 'end' before all streams have been
fully destroyed.

Fixes: nodejs#32032

PR-URL: nodejs#32158
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
  • Loading branch information
ronag authored and addaleax committed Mar 27, 2020
1 parent cba9f2e commit 1428a92
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
13 changes: 13 additions & 0 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,35 @@ function eos(stream, opts, callback) {

const wState = stream._writableState;
const rState = stream._readableState;
const state = wState || rState;

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};

// TODO (ronag): Improve soft detection to include core modules and
// common ecosystem modules that do properly emit 'close' but fail
// this generic check.
const willEmitClose = (
state &&
state.autoDestroy &&
state.emitClose &&
state.closed === false
);

let writableFinished = stream.writableFinished ||
(wState && wState.finished);
const onfinish = () => {
writableFinished = true;
if (willEmitClose && (!stream.readable || readable)) return;
if (!readable || readableEnded) callback.call(stream);
};

let readableEnded = stream.readableEnded ||
(rState && rState.endEmitted);
const onend = () => {
readableEnded = true;
if (willEmitClose && (!stream.writable || writable)) return;
if (!writable || writableFinished) callback.call(stream);
};

Expand Down
43 changes: 42 additions & 1 deletion test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ const {
Readable,
Transform,
pipeline,
PassThrough
PassThrough,
Duplex
} = require('stream');
const assert = require('assert');
const http = require('http');
Expand Down Expand Up @@ -1077,3 +1078,43 @@ const { promisify } = require('util');
assert.ifError(err);
}));
}

{
let closed = false;
const src = new Readable({
read() {},
destroy(err, cb) {
process.nextTick(cb);
}
});
const dst = new Writable({
write(chunk, encoding, callback) {
callback();
}
});
src.on('close', () => {
closed = true;
});
src.push(null);
pipeline(src, dst, common.mustCall((err) => {
assert.strictEqual(closed, true);
}));
}

{
let closed = false;
const src = new Readable({
read() {},
destroy(err, cb) {
process.nextTick(cb);
}
});
const dst = new Duplex({});
src.on('close', common.mustCall(() => {
closed = true;
}));
src.push(null);
pipeline(src, dst, common.mustCall((err) => {
assert.strictEqual(closed, true);
}));
}

0 comments on commit 1428a92

Please sign in to comment.