forked from piscinajs/piscina
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfixed-queue.ts
210 lines (160 loc) · 6.12 KB
/
fixed-queue.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import { test } from 'tap';
import { kQueueOptions } from '../dist/symbols';
import { Piscina, FixedQueue, PiscinaTask as Task } from '..';
import { resolve } from 'node:path';
// @ts-expect-error - it misses several properties, but it's enough for the test
class QueueTask implements Task {
get [kQueueOptions] () {
return null;
}
}
test('queue length', async ({ equal }) => {
const queue = new FixedQueue();
equal(queue.size, 0);
queue.push(new QueueTask());
equal(queue.size, 1);
queue.shift();
equal(queue.size, 0);
});
test('queue length should not become negative', async ({ equal }) => {
const queue = new FixedQueue();
equal(queue.size, 0);
queue.shift();
equal(queue.size, 0);
});
test('queue remove', async ({ equal }) => {
const queue = new FixedQueue();
const task = new QueueTask();
equal(queue.size, 0, 'should be empty on start');
queue.push(task);
equal(queue.size, 1, 'should contain single task after push');
queue.remove(task);
equal(queue.size, 0, 'should be empty after task removal');
});
test('remove not queued task should not lead to errors', async ({ equal }) => {
const queue = new FixedQueue();
const task = new QueueTask();
equal(queue.size, 0, 'should be empty on start');
queue.remove(task);
equal(queue.size, 0, 'should be empty after task removal');
});
test('removing elements from intermediate CircularBuffer should not lead to issues', async ({ equal, same }) => {
/*
The test intends to check following scenario:
1) We fill the queue with 3 full circular buffers amount of items.
2) Empty the middle circular buffer with remove().
3) This should lead to the removal of the middle buffer from the queue:
- Before emptying: tail buffer -> middle buffer -> head buffer.
- After emptying: tail buffer -> head buffer.
*/
const queue = new FixedQueue();
// size of single circular buffer
const batchSize = 2047;
const firstBatch = Array.from({ length: batchSize }, () => new QueueTask());
const secondBatch = Array.from({ length: batchSize }, () => new QueueTask());
const thirdBatch = Array.from({ length: batchSize }, () => new QueueTask());
const tasks = firstBatch.concat(secondBatch, thirdBatch);
for (const task of tasks) {
queue.push(task);
}
equal(queue.size, tasks.length, `should contain ${batchSize} * 3 items`);
let size = queue.size;
for (const task of secondBatch) {
queue.remove(task);
equal(queue.size, --size, `should contain ${size} items`);
}
const expected = firstBatch.concat(thirdBatch);
const actual = [];
while (!queue.isEmpty()) {
const task = queue.shift();
actual.push(task);
}
same(actual, expected);
});
test('removing elements from first CircularBuffer should not lead to issues', async ({ equal, same }) => {
/*
The test intends to check following scenario:
1) We fill the queue with 3 full circular buffers amount of items.
2) Empty the first circular buffer with remove().
3) This should lead to the removal of the tail buffer from the queue:
- Before emptying: tail buffer -> middle buffer -> head buffer.
- After emptying: tail buffer (previously middle) -> head buffer.
*/
const queue = new FixedQueue();
// size of single circular buffer
const batchSize = 2047;
const firstBatch = Array.from({ length: batchSize }, () => new QueueTask());
const secondBatch = Array.from({ length: batchSize }, () => new QueueTask());
const thirdBatch = Array.from({ length: batchSize }, () => new QueueTask());
const tasks = firstBatch.concat(secondBatch, thirdBatch);
for (const task of tasks) {
queue.push(task);
}
equal(queue.size, tasks.length, `should contain ${batchSize} * 3 items`);
let size = queue.size;
for (const task of firstBatch) {
queue.remove(task);
equal(queue.size, --size, `should contain ${size} items`);
}
const expected = secondBatch.concat(thirdBatch);
const actual = [];
while (!queue.isEmpty()) {
const task = queue.shift();
actual.push(task);
}
same(actual, expected);
});
test('removing elements from last CircularBuffer should not lead to issues', async ({ equal, same }) => {
/*
The test intends to check following scenario:
1) We fill the queue with 3 full circular buffers amount of items.
2) Empty the last circular buffer with remove().
3) This should lead to the removal of the head buffer from the queue:
- Before emptying: tail buffer -> middle buffer -> head buffer.
- After emptying: tail buffer -> head buffer (previously middle).
*/
const queue = new FixedQueue();
// size of single circular buffer
const batchSize = 2047;
const firstBatch = Array.from({ length: batchSize }, () => new QueueTask());
const secondBatch = Array.from({ length: batchSize }, () => new QueueTask());
const thirdBatch = Array.from({ length: batchSize }, () => new QueueTask());
const tasks = firstBatch.concat(secondBatch, thirdBatch);
for (const task of tasks) {
queue.push(task);
}
equal(queue.size, tasks.length, `should contain ${batchSize} * 3 items`);
let size = queue.size;
for (const task of thirdBatch) {
queue.remove(task);
equal(queue.size, --size, `should contain ${size} items`);
}
const expected = firstBatch.concat(secondBatch);
const actual = [];
while (!queue.isEmpty()) {
const task = queue.shift();
actual.push(task);
}
same(actual, expected);
});
test('simple integraion with Piscina', async ({ equal }) => {
const queue = new FixedQueue();
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/simple-isworkerthread-named-import.ts'),
taskQueue: queue
});
const result = await pool.run(null);
equal(result, 'done');
});
test('concurrent calls with Piscina', async ({ same }) => {
const queue = new FixedQueue();
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval-async.js'),
taskQueue: queue
});
const tasks = ['1+1', '2+2', '3+3'];
const results = await Promise.all(tasks.map((task) => pool.run(task)));
// eslint-disable-next-line
const expected = tasks.map(eval);
same(results, expected);
});