diff --git a/JavaScript/5-pipe.js b/JavaScript/5-pipe.js index a1fe395..9be8981 100644 --- a/JavaScript/5-pipe.js +++ b/JavaScript/5-pipe.js @@ -117,8 +117,10 @@ class Queue { resume() { this.paused = false; if (this.waiting.length > 0) { - const hasChannel = this.count < this.concurrency; - if (hasChannel) this.takeNext(); + const channels = this.concurrency - this.count; + for (let i = 0; i < channels; i++) { + this.takeNext(); + } } return this; } @@ -132,18 +134,4 @@ class Queue { } } -// Usage - -const destination = Queue.channels(2) - .wait(5000) - .process((task, next) => next(null, { ...task, processed: true })) - .done((err, task) => console.log({ task })); - -const source = Queue.channels(3) - .timeout(4000) - .process((task, next) => setTimeout(next, task.interval, null, task)) - .pipe(destination); - -for (let i = 0; i < 10; i++) { - source.add({ name: `Task${i}`, interval: 1000 }); -} +module.exports = Queue; diff --git a/queue.js b/queue.js new file mode 100644 index 0000000..a8dd953 --- /dev/null +++ b/queue.js @@ -0,0 +1,33 @@ +'use strict'; + +process.once('unhandledRejection', err => { + console.error(err); + process.exit(1); +}); + +const Queue = require('./JavaScript/5-pipe'); + +const fs = require('fs').promises; +const path = require('path'); + +const N = 1000; + +const q = new Queue(100).pause(); + +for (let i = 0; i < N; i++) { + q.add({ path: path.join(process.cwd(), `file${i}`), i }); +} + +let result = 0; + +q.process(({ path, i }, next) => { + fs.readFile(path).then(data => { + result += data[i]; + }); +}).done(() => { + console.timeEnd(); + console.log(result); +}); + +console.time(); +q.resume();