forked from piscinajs/piscina
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool-close.ts
89 lines (69 loc) · 3.16 KB
/
pool-close.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import { once } from 'node:events';
import { resolve } from 'node:path';
import { test } from 'tap';
import Piscina from '..';
test('close()', async (t) => {
t.test('no pending tasks', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js') });
await pool.close();
t.pass('pool closed successfully');
});
t.test('no pending tasks (with minThreads=0)', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), minThreads: 0 });
await pool.close();
t.pass('pool closed successfully');
});
});
test('queued tasks waits for all tasks to complete', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 });
const task1 = pool.run({ time: 100 });
const task2 = pool.run({ time: 100 });
setImmediate(() => t.resolves(pool.close(), 'close is resolved when all running tasks are completed'));
await Promise.all([
t.resolves(once(pool, 'close'), 'handler is called when pool is closed'),
t.resolves(task1, 'complete running task'),
t.resolves(task2, 'complete running task')
]);
});
test('abort any task enqueued during closing up', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 });
setImmediate(() => {
t.resolves(pool.close(), 'close is resolved when running tasks are completed');
t.resolves(pool.run({ time: 1000 }).then(null, err => {
t.equal(err.message, 'The task has been aborted');
t.equal(err.cause, 'queue is being terminated');
}));
});
await t.resolves(pool.run({ time: 100 }), 'complete running task');
});
test('force: queued tasks waits for all tasks already running and aborts tasks that are not started yet', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1, concurrentTasksPerWorker: 2 });
const task1 = pool.run({ time: 1000 });
const task2 = pool.run({ time: 1000 });
// const task3 = pool.run({ time: 100 });
// const task4 = pool.run({ time: 100 });
t.plan(6);
t.resolves(pool.close({ force: true }));
t.resolves(once(pool, 'close'), 'handler is called when pool is closed');
t.resolves(task1, 'complete running task');
t.resolves(task2, 'complete running task');
t.rejects(pool.run({ time: 100 }), /The task has been aborted/, 'abort task that are not started yet');
t.rejects(pool.run({ time: 100 }), /The task has been aborted/, 'abort task that are not started yet');
await task1;
await task2;
});
test('timed out close operation destroys the pool', async (t) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/sleep.js'),
maxThreads: 1,
closeTimeout: 500
});
const task1 = pool.run({ time: 5000 });
const task2 = pool.run({ time: 5000 });
setImmediate(() => t.resolves(pool.close(), 'close is resolved on timeout'));
await Promise.all([
t.resolves(once(pool, 'error'), 'error handler is called on timeout'),
t.rejects(task1, /Terminating worker thread/, 'task is aborted due to timeout'),
t.rejects(task2, /Terminating worker thread/, 'task is aborted due to timeout')
]);
});