-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
185 lines (145 loc) · 6.91 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
const wait = require('./utils/wait');
const funcIsAsync = require('./utils/funcIsAsync');
/**
* Parallel processing of elements of an array of values
* Параллельная обработка элементов массива значений
*
* @param {array} arrayValues - Array of values for parallel processing
* Массив значений для параллельной обработки
* @param {function} processingFn - A synchronous or asynchronous function that processes an array element
* Синхронная или асинхронная функция, выполняющая обработку элемента массива
* @param {function} [nextValueFn] - A synchronous or asynchronous function that filters arrayValues
* Синхронная или асинхронная функция, выполняющая фильтрацию arrayValues
* @param {number} [parallel = 1000] - The number of parallel processed elements per iteration
* Количество параллельно обрабатываемых элементов за итерацию
* @param {boolean} [awaitRes = false] - Expect an answer i.e. wait for completion to complete in synchronous style
* Ожидать ответ т.е. ждать завершение выполнения в синхронном стиле
* @param {function} [cb] - Synchronous or asynchronous callback function, will work if awaitRes = false.
* An error stack will be passed to the function.
* Синхронная или асинхронная функция обратного вызова, отработает если awaitRes = false.
* В функцию будет передан стек ошибок.
* @param {number} [cycleTimeout = 200] - Asynchronous pause between iterations (milliseconds), for awaitRes = true
* Асинхронная пауза между итерациями (миллисекунды), для awaitRes = true
* @return {Promise<*>} - Will return an error stack if awaitRes = true
* Вернет стек ошибок, если awaitRes = true
*/
async function parallelProcessing({
arrayValues,
processingFn,
nextValueFn,
cycleTimeout = 200,
parallel = 1000,
awaitRes = false,
cb
}) {
const isBadArray = !Array.isArray(arrayValues);
if (isBadArray) throw new Error('Bad array values');
const isBadFn = !processingFn || typeof processingFn !== 'function';
if (isBadFn) throw new Error('Bad processing function');
const stackError = [];
// An array element to start parallel processing from
// Элемент массива с которого начать параллельную обработку
let first = 0;
// Iteration number
// Номер итерации
let i = 1;
const processing = async () => {
const promises = [];
// The final element of parallel processing
// Конечный элемент параллельной обработки
const last = Math.min(i * parallel, arrayValues.length);
for (let j = first; j < last; j++) {
const value = arrayValues[j];
const params = {value, index: j, arrayValues, iteration: i};
let next = false;
// If a filter function is specified, execute it
// true - take the next element of the array for processing
// false - continue processing this array element
// Если задана функция фильтрации выполняем ее
// true - берем следующий элемент массива в обработку
// false - продолжаем обработку данного элемента массива
const isNextFn = nextValueFn && typeof nextValueFn === 'function';
if (isNextFn) {
const isAsync = funcIsAsync(nextValueFn);
if (isAsync) {
next = await nextValueFn(params);
} else {
next = nextValueFn(params);
}
}
if (next) continue;
// Fill An Array Of Parallel Jobs
// Заполняем массив параллельных заданий
let promise = null;
const isAsync = funcIsAsync(processingFn);
if (isAsync) {
// If processFn is an asynchronous function
// Если processFn асинхронная функция
promise = processingFn(params).catch(e => stackError.push({
value,
index: j,
iteration: i,
error: e
}));
promises.push(promise);
} else {
// If processFn is a synchronous function
// Если processFn синхронная функция
promise = async () => {
try {
processingFn(params);
} catch (e) {
stackError.push({
value,
index: j,
iteration: i,
error: e
});
}
};
promises.push(promise());
}
}
// Perform parallel processing
// Выполняем параллельную обработку
await Promise.all(promises);
// If we processed all the elements, we end the parallel processing
// Если обработали все элементы завершаем работу параллельной обработки
const stop = last >= arrayValues.length;
if (stop) {
// If asynchronous execution is in synchronous style
// Если асинхронное выполнение в синхронном стиле
if (awaitRes) return stackError;
// If asynchronous execution
// Если асинхронное выполнение
const isFn = cb && typeof cb === 'function';
if (isFn) {
const isAsync = funcIsAsync(cb);
if (isAsync) {
await cb(stackError);
} else {
cb(stackError);
}
}
return;
}
// Go to the next group of parallel processing
// Переход к следующей группе параллельной обработки
i++;
first = last;
// Pause asynchronously
// Делаем асинхронную паузу
if (awaitRes) {
await wait(cycleTimeout);
return await processing();
} else {
setImmediate(async () => await processing());
}
}
if (awaitRes) {
return await processing();
} else {
setImmediate(async () => await processing());
}
}
module.exports = parallelProcessing;