Skip to content

Commit

Permalink
stream: add signal support to pipeline generators
Browse files Browse the repository at this point in the history
Generators in pipeline must be able to be aborted or pipeline
can deadlock.

PR-URL: nodejs#39067
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
ronag committed Aug 25, 2021
1 parent 2acd866 commit c04d621
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 33 deletions.
51 changes: 44 additions & 7 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1870,16 +1870,14 @@ const { pipeline } = require('stream/promises');

async function run() {
const ac = new AbortController();
const options = {
signal: ac.signal,
};
const signal = ac.signal;

setTimeout(() => ac.abort(), 1);
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
options,
{ signal },
);
}

Expand All @@ -1895,10 +1893,10 @@ const fs = require('fs');
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source) {
async function* (source, signal) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield chunk.toUpperCase();
yield await processChunk(chunk, { signal });
}
},
fs.createWriteStream('uppercase.txt')
Expand All @@ -1909,6 +1907,28 @@ async function run() {
run().catch(console.error);
```

Remember to handle the `signal` argument passed into the async generator.
Especially in the case where the async generator is the source for the
pipeline (i.e. first argument) or the pipeline will never complete.

```js
const { pipeline } = require('stream/promises');
const fs = require('fs');

async function run() {
await pipeline(
async function * (signal) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt')
);
console.log('Pipeline succeeded.');
}

run().catch(console.error);
```

`stream.pipeline()` will call `stream.destroy(err)` on all streams except:
* `Readable` streams which have emitted `'end'` or `'close'`.
* `Writable` streams which have emitted `'finish'` or `'close'`.
Expand Down Expand Up @@ -3407,13 +3427,20 @@ the `Readable.from()` utility method:
```js
const { Readable } = require('stream');

const ac = new AbortController();
const signal = ac.signal;

async function * generate() {
yield 'a';
await someLongRunningFn({ signal });
yield 'b';
yield 'c';
}

const readable = Readable.from(generate());
readable.on('close', () => {
ac.abort();
});

readable.on('data', (chunk) => {
console.log(chunk);
Expand All @@ -3433,21 +3460,31 @@ const { pipeline: pipelinePromise } = require('stream/promises');

const writable = fs.createWriteStream('./file');

const ac = new AbortController();
const signal = ac.signal;

const iterator = createIterator({ signal });

// Callback Pattern
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err);
} else {
console.log(value, 'value returned');
}
}).on('close', () => {
ac.abort();
});

// Promise Pattern
pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');
})
.catch(console.error);
.catch((err) => {
console.error(err);
ac.abort();
});
```

<!--type=misc-->
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const pipeline = require('internal/streams/pipeline');
const { pipeline } = require('internal/streams/pipeline');
const Duplex = require('internal/streams/duplex');
const { destroyer } = require('internal/streams/destroy');
const {
Expand Down
18 changes: 14 additions & 4 deletions lib/internal/streams/duplexify.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const from = require('internal/streams/from');
const {
isBlob,
} = require('internal/blob');
const { AbortController } = require('internal/abort_controller');

const {
FunctionPrototypeCall
Expand Down Expand Up @@ -81,14 +82,15 @@ module.exports = function duplexify(body, name) {
// }

if (typeof body === 'function') {
const { value, write, final } = fromAsyncGen(body);
const { value, write, final, destroy } = fromAsyncGen(body);

if (isIterable(value)) {
return from(Duplexify, value, {
// TODO (ronag): highWaterMark?
objectMode: true,
write,
final
final,
destroy
});
}

Expand Down Expand Up @@ -123,7 +125,8 @@ module.exports = function duplexify(body, name) {
process.nextTick(cb, err);
}
});
}
},
destroy
});
}

Expand Down Expand Up @@ -202,15 +205,18 @@ module.exports = function duplexify(body, name) {

function fromAsyncGen(fn) {
let { promise, resolve } = createDeferredPromise();
const ac = new AbortController();
const signal = ac.signal;
const value = fn(async function*() {
while (true) {
const { chunk, done, cb } = await promise;
process.nextTick(cb);
if (done) return;
if (signal.aborted) throw new AbortError();
yield chunk;
({ promise, resolve } = createDeferredPromise());
}
}());
}(), { signal });

return {
value,
Expand All @@ -219,6 +225,10 @@ function fromAsyncGen(fn) {
},
final(cb) {
resolve({ done: true, cb });
},
destroy(err, cb) {
ac.abort();
cb(err);
}
};
}
Expand Down
40 changes: 35 additions & 5 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@ const {
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED,
},
AbortError,
} = require('internal/errors');

const { validateCallback } = require('internal/validators');
const {
validateCallback,
validateAbortSignal
} = require('internal/validators');

const {
isIterable,
isReadableNodeStream,
isNodeStream,
} = require('internal/streams/utils');
const { AbortController } = require('internal/abort_controller');

let PassThrough;
let Readable;
Expand Down Expand Up @@ -168,19 +173,37 @@ function pipeline(...streams) {
streams = streams[0];
}

return pipelineImpl(streams, callback);
}

function pipelineImpl(streams, callback, opts) {
if (streams.length < 2) {
throw new ERR_MISSING_ARGS('streams');
}

const ac = new AbortController();
const signal = ac.signal;
const outerSignal = opts?.signal;

validateAbortSignal(outerSignal, 'options.signal');

function abort() {
finishImpl(new AbortError());
}

outerSignal?.addEventListener('abort', abort);

let error;
let value;
const destroys = [];

let finishCount = 0;

function finish(err) {
const final = --finishCount === 0;
finishImpl(err, --finishCount === 0);
}

function finishImpl(err, final) {
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
error = err;
}
Expand All @@ -193,6 +216,9 @@ function pipeline(...streams) {
destroys.shift()(error);
}

outerSignal?.removeEventListener('abort', abort);
ac.abort();

if (final) {
callback(error, value);
}
Expand All @@ -211,7 +237,7 @@ function pipeline(...streams) {

if (i === 0) {
if (typeof stream === 'function') {
ret = stream();
ret = stream({ signal });
if (!isIterable(ret)) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
Expand All @@ -223,7 +249,7 @@ function pipeline(...streams) {
}
} else if (typeof stream === 'function') {
ret = makeAsyncIterable(ret);
ret = stream(ret);
ret = stream(ret, { signal });

if (reading) {
if (!isIterable(ret, true)) {
Expand Down Expand Up @@ -291,7 +317,11 @@ function pipeline(...streams) {
}
}

if (signal?.aborted || outerSignal?.aborted) {
process.nextTick(abort);
}

return ret;
}

module.exports = pipeline;
module.exports = { pipelineImpl, pipeline };
2 changes: 1 addition & 1 deletion lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ const {
promisify: { custom: customPromisify },
} = require('internal/util');

const pipeline = require('internal/streams/pipeline');
const compose = require('internal/streams/compose');
const { pipeline } = require('internal/streams/pipeline');
const { destroyer } = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');
Expand Down
18 changes: 3 additions & 15 deletions lib/stream/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,12 @@ const {
Promise,
} = primordials;

const {
addAbortSignalNoValidate,
} = require('internal/streams/add-abort-signal');

const {
validateAbortSignal,
} = require('internal/validators');

const {
isIterable,
isNodeStream,
} = require('internal/streams/utils');

const pl = require('internal/streams/pipeline');
const { pipelineImpl: pl } = require('internal/streams/pipeline');
const eos = require('internal/streams/end-of-stream');

function pipeline(...streams) {
Expand All @@ -29,19 +21,15 @@ function pipeline(...streams) {
!isNodeStream(lastArg) && !isIterable(lastArg)) {
const options = ArrayPrototypePop(streams);
signal = options.signal;
validateAbortSignal(signal, 'options.signal');
}

const pipe = pl(...streams, (err, value) => {
pl(streams, (err, value) => {
if (err) {
reject(err);
} else {
resolve(value);
}
});
if (signal) {
addAbortSignalNoValidate(signal, pipe);
}
}, { signal });
});
}

Expand Down
19 changes: 19 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ const {
Duplex,
addAbortSignal,
} = require('stream');
const pipelinep = require('stream/promises').pipeline;
const assert = require('assert');
const http = require('http');
const { promisify } = require('util');
const net = require('net');
const tsp = require('timers/promises');

{
let finished = false;
Expand Down Expand Up @@ -1387,3 +1389,20 @@ const net = require('net');
assert.strictEqual(res, content);
}));
}

{
const ac = new AbortController();
const signal = ac.signal;
pipelinep(
async function * ({ signal }) {
await tsp.setTimeout(1e6, signal);
},
async function(source) {

},
{ signal }
).catch(common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
ac.abort();
}

0 comments on commit c04d621

Please sign in to comment.