forked from dottorblaster/hoverlord
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
118 lines (103 loc) · 2.82 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
const {
Worker,
isMainThread,
parentPort,
threadId,
} = require('worker_threads');
const crypto = require('crypto');
const { createSupervisor } = require('./supervisor');
const masterSupervisor = isMainThread ? createSupervisor() : null;
const isFromWorker = (payload) => Boolean(payload.fromWorker);
const createFingerprint = () => crypto.randomBytes(64).toString('hex');
const createWorkerContent = (jobCode) => {
return `(${jobCode})(require(${JSON.stringify(__filename)}));`;
}
const spawn = (job, name) => {
return new Promise((resolve) => {
const jobCode = job.toString();
const workerContent = createWorkerContent(jobCode);
const actor = new Worker(workerContent, { eval: true });
actor.on('message', (payload) => {
if (isFromWorker(payload)) {
const { recipient } = payload;
masterSupervisor.send(recipient, payload);
}
});
masterSupervisor.store(name, actor);
resolve(actor);
});
};
const receive = (reducer, startState = undefined) => {
let state = startState;
parentPort.on('message', (message) => {
state = reducer(state, message);
});
};
const send = (recipient, content) => {
if (isMainThread) {
masterSupervisor.send(recipient, { content });
} else {
parentPort.postMessage({ fromWorker: true, recipient, content });
}
};
const reply = (request, response) => {
const { fingerprint: requestFingerprint, sender: requestSender } = request;
const message = {
requestSender,
content: response,
recipient: requestSender,
fingerprint: requestFingerprint,
sender: threadId,
fromWorker: !isMainThread,
};
if (isMainThread) {
masterSupervisor.send(message);
} else {
parentPort.postMessage(message);
}
};
const call = (recipient, messageContent) => {
return new Promise((resolve) => {
const fingerprint = createFingerprint();
const message = {
recipient,
fingerprint,
content: messageContent,
sender: threadId,
fromWorker: !isMainThread,
};
if (isMainThread) {
const actor = masterSupervisor.getProcess(recipient);
const callback = (payload) => {
if (payload.fingerprint === fingerprint) {
resolve(payload);
actor.off('message', callback);
}
};
actor.on('message', callback);
masterSupervisor.send(recipient, message);
} else {
const callback = (payload) => {
if (payload.fingerprint === fingerprint) {
resolve(payload);
parentPort.off('message', callback);
}
};
parentPort.on('message', callback);
parentPort.postMessage(message);
}
});
};
const shutdown = (supervisor = masterSupervisor) => {
supervisor.shutdown();
};
module.exports = {
spawn,
receive,
send,
call,
reply,
shutdown,
createSupervisor,
masterSupervisor,
};