forked from piscinajs/piscina
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathatomics-optimization.ts
122 lines (99 loc) · 3.64 KB
/
atomics-optimization.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import { resolve } from 'node:path';
import { test } from 'tap';
import Piscina from '..';
test('coverage test for Atomics optimization (sync mode)', async ({ equal }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/notify-then-sleep-or.js'),
minThreads: 2,
maxThreads: 2,
concurrentTasksPerWorker: 2,
atomics: 'sync'
});
const tasks = [];
let v: number;
// Post 4 tasks, and wait for all of them to be ready.
const i32array = new Int32Array(new SharedArrayBuffer(4));
for (let index = 0; index < 4; index++) {
tasks.push(pool.run({ i32array, index }));
}
// Wait for 2 tasks to enter 'wait' state.
do {
v = Atomics.load(i32array, 0);
if (popcount8(v) >= 2) break;
Atomics.wait(i32array, 0, v);
} while (true);
// The check above could also be !== 2 but it's hard to get things right
// sometimes and this gives us a nice assertion. Basically, at this point
// exactly 2 tasks should be in Atomics.wait() state.
equal(popcount8(v), 2);
// Wake both tasks up as simultaneously as possible. The other 2 tasks should
// then start executing.
Atomics.store(i32array, 0, 0);
Atomics.notify(i32array, 0, Infinity);
// Wait for the other 2 tasks to enter 'wait' state.
do {
v = Atomics.load(i32array, 0);
if (popcount8(v) >= 2) break;
Atomics.wait(i32array, 0, v);
} while (true);
// At this point, the first two tasks are definitely finished and have
// definitely posted results back to the main thread, and the main thread
// has definitely not received them yet, meaning that the Atomics check will
// be used. Making sure that that works is the point of this test.
// Wake up the remaining 2 tasks in order to make sure that the test finishes.
// Do the same consistency check beforehand as above.
equal(popcount8(v), 2);
Atomics.store(i32array, 0, 0);
Atomics.notify(i32array, 0, Infinity);
await Promise.all(tasks);
});
// Inefficient but straightforward 8-bit popcount
function popcount8 (v : number) : number {
v &= 0xff;
if (v & 0b11110000) return popcount8(v >>> 4) + popcount8(v & 0xb00001111);
if (v & 0b00001100) return popcount8(v >>> 2) + popcount8(v & 0xb00000011);
if (v & 0b00000010) return popcount8(v >>> 1) + popcount8(v & 0xb00000001);
return v;
}
test('avoids unbounded recursion', async () => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/simple-isworkerthread.ts'),
minThreads: 2,
maxThreads: 2,
atomics: 'sync'
});
const tasks = [];
for (let i = 1; i <= 10000; i++) {
tasks.push(pool.run(null));
}
await Promise.all(tasks);
});
test('enable async mode', async (t) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval-params.js'),
minThreads: 1,
maxThreads: 1,
atomics: 'async'
});
const bufs = [
new Int32Array(new SharedArrayBuffer(4)),
new Int32Array(new SharedArrayBuffer(4)),
new Int32Array(new SharedArrayBuffer(4))
];
const script = `
setTimeout(() => { Atomics.add(input.shared[0], 0, 1); Atomics.notify(input.shared[0], 0, Infinity); }, 100);
setTimeout(() => { Atomics.add(input.shared[1], 0, 1); Atomics.notify(input.shared[1], 0, Infinity); }, 300);
setTimeout(() => { Atomics.add(input.shared[2], 0, 1); Atomics.notify(input.shared[2], 0, Infinity); }, 500);
true
`;
const promise = pool.run({
code: script,
shared: bufs
});
t.plan(2);
const atResult1 = Atomics.wait(bufs[0], 0, 0);
const atResult2 = Atomics.wait(bufs[1], 0, 0);
const atResult3 = Atomics.wait(bufs[2], 0, 0);
t.same([atResult1, atResult2, atResult3], ['ok', 'ok', 'ok']);
t.equal(await promise, true);
});