-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathtask_system.cpp
296 lines (254 loc) · 8.88 KB
/
task_system.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
#ifndef _CRT_SECURE_NO_WARNINGS
#define _CRT_SECURE_NO_WARNINGS
#endif
#include "task_system.h"
#include <TaskScheduler.h>
#include <core/md_common.h>
#include <core/md_log.h>
#include <core/md_allocator.h>
#include <core/md_array.h>
#include <core/md_os.h>
#include <string.h>
#include <atomic_queue.h>
// Blatantly stolen from ImGui (thanks Omar!)
struct NewDummy {};
inline void* operator new(size_t, NewDummy, void* ptr) { return ptr; }
inline void operator delete(void*, NewDummy, void*) {} // This is only required so we can use the symetrical new()
#define PLACEMENT_NEW(_PTR) new(NewDummy(), _PTR)
namespace task_system {
#define MAX_TASKS 256
#define LABEL_SIZE 64
static inline ID generate_id(uint32_t slot_idx) {
return (md_time_current() << 8) | (slot_idx & (MAX_TASKS - 1));
}
static inline uint32_t get_slot_idx(ID id) {
return (uint32_t)(id & (MAX_TASKS - 1));
}
namespace main {
static atomic_queue::AtomicQueue<uint32_t, MAX_TASKS, 0xFFFFFFFF> free_slots;
}
namespace pool {
static atomic_queue::AtomicQueue<uint32_t, MAX_TASKS, 0xFFFFFFFF> free_slots;
}
struct CompletionActionFreePoolSlot : public enki::ICompletable {
public:
CompletionActionFreePoolSlot() = default;
void OnDependenciesComplete(enki::TaskScheduler* taskscheduler, uint32_t threadnum ) final {
ICompletable::OnDependenciesComplete(taskscheduler, threadnum);
pool::free_slots.push(m_slot_idx);
}
uint32_t m_slot_idx = UINT32_MAX;
enki::Dependency m_dependency = {};
};
class AsyncTask : public enki::ITaskSet {
public:
AsyncTask() = default;
AsyncTask(uint32_t set_size, RangeTask set_func, str_t lbl = {}, ID id = INVALID_ID, uint32_t grain_size = 1)
: ITaskSet(DIV_UP(set_size, grain_size)), m_set_func(set_func), m_set_size(set_size), m_grain_size(grain_size), m_set_complete(0), m_interrupt(false), m_dependency(), m_id(id) {
size_t len = str_copy_to_char_buf(m_buf, sizeof(m_buf), lbl);
m_label = {m_buf, len};
m_completion_action.m_slot_idx = get_slot_idx(id);
m_completion_action.SetDependency(m_completion_action.m_dependency, this);
}
AsyncTask(Task func, str_t lbl = {}, ID id = INVALID_ID)
: ITaskSet(1), m_func(func), m_set_complete(0), m_interrupt(false), m_dependency(), m_id(id) {
size_t len = str_copy_to_char_buf(m_buf, sizeof(m_buf), lbl);
m_label = {m_buf, len};
m_completion_action.m_slot_idx = get_slot_idx(id);
m_completion_action.SetDependency(m_completion_action.m_dependency, this);
}
void ExecuteRange(enki::TaskSetPartition range, uint32_t threadnum) final {
(void)threadnum;
if (!m_interrupt) {
if (m_set_func) {
uint32_t beg = range.start * m_grain_size;
uint32_t end = MIN(m_set_size, range.end * m_grain_size);
m_set_func(beg, end, threadnum);
m_set_complete += (range.end - range.start);
}
else if (m_func) {
m_func();
m_set_complete += 1;
}
}
}
inline bool Running() const {
return !GetIsComplete();
}
RangeTask m_set_func = nullptr; // either of these two are executed
Task m_func = nullptr;
uint32_t m_set_size = 0;
uint32_t m_grain_size = 1;
std::atomic_uint32_t m_set_complete = 0;
std::atomic_bool m_interrupt = false;
enki::Dependency m_dependency = {};
CompletionActionFreePoolSlot m_completion_action = {};
char m_buf[LABEL_SIZE] = "";
str_t m_label = {};
ID m_id = INVALID_ID;
};
class MainTask : public enki::IPinnedTask {
public:
MainTask() = default;
MainTask(Task func, str_t lbl = {}, ID id = INVALID_ID) :
IPinnedTask(0), m_function(func), m_id(id) {
size_t len = MIN(lbl.len, LABEL_SIZE-1);
m_label = {strncpy(m_buf, lbl.ptr, len), len};
}
void Execute() final {
m_function();
main::free_slots.push(get_slot_idx(m_id));
}
Task m_function = nullptr;
enki::Dependency m_dependency = {};
char m_buf[LABEL_SIZE] = "";
str_t m_label = {};
ID m_id = INVALID_ID;
};
namespace main {
static MainTask task_data[MAX_TASKS];
}
namespace pool {
static AsyncTask task_data[MAX_TASKS];
}
static inline enki::ICompletable* get_task(ID id) {
if (id != INVALID_ID) {
uint32_t slot_idx = get_slot_idx(id);
AsyncTask* ptask = &pool::task_data[slot_idx];
MainTask* mtask = &main::task_data[slot_idx];
if (ptask->m_id == id) return ptask;
else if (mtask->m_id == id) return mtask;
}
return NULL;
}
static enki::TaskScheduler ts{};
void initialize(size_t num_threads = 0) {
ts.Initialize((uint32_t)num_threads);
for (uint32_t i = 0; i < MAX_TASKS; i++) {
pool::free_slots.push(i);
main::free_slots.push(i);
}
}
void shutdown() { ts.WaitforAllAndShutdown(); }
ID create_main_task(str_t label, Task func) {
const uint32_t idx = main::free_slots.pop();
ID id = generate_id(idx);
MainTask* task = &main::task_data[idx];
PLACEMENT_NEW(task) MainTask(func, label, id);
return id;
}
ID create_pool_task(str_t label, Task func) {
const uint32_t idx = pool::free_slots.pop();
ID id = generate_id(idx);
AsyncTask* task = &pool::task_data[idx];
PLACEMENT_NEW(task) AsyncTask(func, label, id);
return id;
}
ID create_pool_task(str_t label, uint32_t range_size, RangeTask func, uint32_t grain_size) {
const uint32_t idx = pool::free_slots.pop();
ID id = generate_id(idx);
AsyncTask* task = &pool::task_data[idx];
PLACEMENT_NEW(task) AsyncTask(range_size, func, label, id, grain_size);
return id;
}
void enqueue_task(ID id) {
const uint32_t slot_idx = get_slot_idx(id);
{
AsyncTask* task = &pool::task_data[slot_idx];
if (task->m_id == id) {
if (task->m_dependency.GetDependencyTask() != NULL) goto dep_error;
ts.AddTaskSetToPipe(&pool::task_data[slot_idx]);
return;
}
}
{
MainTask* task = &main::task_data[slot_idx];
if (task->m_id == id) {
if (task->m_dependency.GetDependencyTask() != NULL) goto dep_error;
ts.AddPinnedTask(task);
return;
}
}
MD_LOG_DEBUG("Invalid Operation: Attempting to enquque invalid task id");
ASSERT(false);
return;
dep_error:
MD_LOG_DEBUG("Invalid Operation: Attempting to enquque task which has dependency set.");
ASSERT(false);
return;
}
void execute_main_task_queue() {
ts.RunPinnedTasks();
}
size_t pool_num_threads() { return ts.GetNumTaskThreads(); }
size_t pool_running_tasks(ID* out_id_arr, size_t id_arr_cap) {
size_t num_tasks = 0;
for (size_t i = 0; i < MAX_TASKS; ++i) {
if (pool::task_data[i].Running()) {
out_id_arr[num_tasks++] = pool::task_data[i].m_id;
if (num_tasks == id_arr_cap) break;
}
}
return num_tasks;
}
void pool_interrupt_running_tasks() {
for (uint32_t i = 0; i < MAX_TASKS; ++i) {
if (pool::task_data[i].Running()) {
pool::task_data[i].m_interrupt = true;
}
}
}
void pool_wait_for_completion() {
ts.WaitforAll();
}
void set_task_dependency(ID task_id, ID dep_id) {
enki::ICompletable* dep = get_task(dep_id);
if (dep == NULL) return;
uint32_t task_idx = get_slot_idx(task_id);
if (pool::task_data[task_idx].m_id == task_id) {
pool::task_data[task_idx].SetDependency(pool::task_data[task_idx].m_dependency, dep);
return;
}
if (main::task_data[task_idx].m_id == task_id) {
main::task_data[task_idx].SetDependency(main::task_data[task_idx].m_dependency, dep);
return;
}
}
bool task_is_running(ID id) {
uint32_t slot_idx = get_slot_idx(id);
AsyncTask* Task = &pool::task_data[slot_idx];
return Task->m_id == id ? Task->Running() : false;
}
str_t task_label(ID id) {
uint32_t slot_idx = get_slot_idx(id);
AsyncTask* Task = &pool::task_data[slot_idx];
return Task->m_id == id ? Task->m_label : str_t{};
}
float task_fraction_complete(ID id) {
uint32_t slot_idx = get_slot_idx(id);
AsyncTask* Task = &pool::task_data[slot_idx];
return Task->m_id == id ? (float)Task->m_set_complete / (float)Task->m_SetSize : 0.f;
}
void task_wait_for(ID id) {
uint32_t slot_idx = get_slot_idx(id);
AsyncTask* Task = &pool::task_data[slot_idx];
if (Task->m_id == id && Task->Running()) {
ts.WaitforTask(Task);
}
}
void task_interrupt(ID id) {
uint32_t slot_idx = get_slot_idx(id);
AsyncTask* Task = &pool::task_data[slot_idx];
if (Task->m_id == id) {
Task->m_interrupt = true;
}
}
void task_interrupt_and_wait_for(ID id) {
uint32_t slot_idx = get_slot_idx(id);
AsyncTask* Task = &pool::task_data[slot_idx];
if (Task->m_id == id && Task->Running()) {
Task->m_interrupt = true;
ts.WaitforTask(Task);
}
}
}; // namespace task_system