forked from cloudius-systems/osv
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpercpu-worker.cc
128 lines (104 loc) · 3.79 KB
/
percpu-worker.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/*
* Copyright (C) 2013 Cloudius Systems, Ltd.
*
* This work is open source software, licensed under the terms of the
* BSD license as described in the LICENSE file in the top-level directory.
*/
#include <osv/debug.hh>
#include <osv/sched.hh>
#include <osv/trace.hh>
#include <osv/percpu.hh>
#include <osv/percpu-worker.hh>
TRACEPOINT(trace_pcpu_worker_sheriff_started, "");;
TRACEPOINT(trace_pcpu_worker_item_invoke, "item=%p", worker_item*);
TRACEPOINT(trace_pcpu_worker_item_signal, "item=%p, dest_cpu=%d", worker_item*, unsigned);
TRACEPOINT(trace_pcpu_worker_item_wait, "item=%p", worker_item*);
TRACEPOINT(trace_pcpu_worker_item_end_wait, "item=%p", worker_item*);
TRACEPOINT(trace_pcpu_worker_item_set_finished, "item=%p, dest_cpu=%d", worker_item*, unsigned);
sched::cpu::notifier workman::_cpu_notifier(workman::pcpu_init);
PERCPU(std::atomic<bool>, workman::_duty);
PERCPU(std::atomic<bool>, workman::_ready);
PERCPU(sched::thread*, workman::_work_sheriff);
extern char _percpu_workers_start[];
extern char _percpu_workers_end[];
workman workman_instance;
worker_item::worker_item(std::function<void ()> handler)
{
_handler = handler;
for (unsigned i=0; i < sched::max_cpus; i++) {
_have_work[i].store(false, std::memory_order_relaxed);
}
}
void worker_item::signal(sched::cpu* cpu)
{
trace_pcpu_worker_item_signal(this, cpu->id);
_have_work[cpu->id].store(true, std::memory_order_relaxed);
workman_instance.signal(cpu);
}
bool worker_item::have_work(sched::cpu* cpu)
{
return (_have_work[cpu->id].load(std::memory_order_acquire));
}
void worker_item::set_finished(sched::cpu* cpu)
{
trace_pcpu_worker_item_set_finished(this, cpu->id);
}
void worker_item::clear_work(sched::cpu* cpu)
{
_have_work[cpu->id].store(false, std::memory_order_release);
}
bool workman::signal(sched::cpu* cpu)
{
if (!(*_ready).load(std::memory_order_relaxed)) {
return false;
}
//
// let the sheriff know that he have to do what he have to do.
// we simply set _duty=true and wake the sheriff
//
// when we signal a worker_item, we set 2 variables to true, the per
// worker_item's per-cpu _have_work variable and the global _duty variable
// of the cpu's sheriff we are signaling.
//
// we want the sheriff to see _duty=true only after _have_work=true.
// in case duty=true will be seen before _have_work=true, we may miss
// it in the sheriff thread.
//
(*(_duty.for_cpu(cpu))).store(true, std::memory_order_release);
(*_work_sheriff.for_cpu(cpu))->wake();
return true;
}
void workman::call_of_duty(void)
{
(*_ready).store(true, std::memory_order_relaxed);
trace_pcpu_worker_sheriff_started();
while (true) {
sched::thread::wait_until([&] {
return ((*_duty).load(std::memory_order_relaxed) == true);
});
(*_duty).store(false, std::memory_order_relaxed);
// Invoke all work items that needs handling. FIXME: O(n)
sched::cpu* current = sched::cpu::current();
worker_item* wi = reinterpret_cast<worker_item*>(_percpu_workers_start);
worker_item* end = reinterpret_cast<worker_item*>(_percpu_workers_end);
while (wi != end) {
// Invoke worker item
if (wi->have_work(current)) {
trace_pcpu_worker_item_invoke(wi);
wi->clear_work(current);
wi->_handler();
wi->set_finished(current);
}
wi++;
}
}
}
void workman::pcpu_init()
{
(*_duty).store(false, std::memory_order_relaxed);
auto c = sched::cpu::current();
// Create PCPU Sheriff
*_work_sheriff = sched::thread::make([] { workman::call_of_duty(); },
sched::thread::attr().pin(c).name(std::string("percpu") + std::to_string(c->id)));
(*_work_sheriff)->start();
}