Skip to content

Commit

Permalink
fixed starv bug
Browse files Browse the repository at this point in the history
  • Loading branch information
twhuang-utah committed Nov 15, 2022
1 parent 1c71e08 commit 59257c1
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 28 deletions.
31 changes: 13 additions & 18 deletions taskflow/core/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ class Executor {
@endcode
*/
int this_worker_id() const;

/**
@brief runs a given function asynchronously
Expand Down Expand Up @@ -1090,10 +1090,12 @@ inline void Executor::_exploit_task(Worker& w, Node*& t) {
// Function: _wait_for_task
inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {

explore_task:
prepare_thief:

++_num_thieves;

explore_task:

_explore_task(worker, t);

// The last thief who successfully stole a task will wake up
Expand All @@ -1108,17 +1110,6 @@ inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
// ---- 2PC guard ----
_notifier.prepare_wait(worker._waiter);

--_num_thieves;

//for(auto& w : _workers) {
// auto queue = std::array{&w._wsq, &_wsq}[w._id == worker._id];
// if(!queue->empty()) {
// worker._vtm = w._id;
// _notifier.cancel_wait(worker._waiter);
// goto explore_task;
// }
//}

if(!_wsq.empty()) {
_notifier.cancel_wait(worker._waiter);
worker._vtm = worker._id;
Expand All @@ -1128,6 +1119,7 @@ inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
if(_done) {
_notifier.cancel_wait(worker._waiter);
_notifier.notify(true);
_num_thieves.fetch_sub(1, std::memory_order_relaxed);
return false;
}

Expand All @@ -1139,6 +1131,8 @@ inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
}
}

--_num_thieves;

/*//if(auto vtm = _find_vtm(me); vtm != _workers.size()) {
if(!_wsq.empty()) {
Expand Down Expand Up @@ -1181,9 +1175,10 @@ inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
}*/

// Now I really need to relinguish my self to others
_notifier.commit_wait(worker._waiter);

goto explore_task;
_notifier.commit_wait(worker._waiter);

goto prepare_thief;
}

// Function: make_observer
Expand Down Expand Up @@ -1234,7 +1229,7 @@ inline void Executor::_schedule(Worker& worker, Node* node) {

// caller is a worker to this pool
if(worker._executor == this) {
if(worker._wsq.push(node, p) && _num_thieves == 0) {
if(worker._wsq.push(node, p) || _num_thieves == 0) {
_notifier.notify(false);
}
return;
Expand Down Expand Up @@ -1285,7 +1280,7 @@ inline void Executor::_schedule(Worker& worker, const SmallVector<Node*>& nodes)
for(size_t i=0; i<num_nodes; ++i) {
auto p = nodes[i]->_priority;
nodes[i]->_state.fetch_or(Node::READY, std::memory_order_release);
if(worker._wsq.push(nodes[i], p) && _num_thieves == 0) {
if(worker._wsq.push(nodes[i], p) || _num_thieves == 0) {
_notifier.notify(false);
}
}
Expand Down
2 changes: 1 addition & 1 deletion taskflow/core/tsq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class TaskQueue {
The operation can trigger the queue to resize its capacity
if more space is required.
*/
TF_FORCE_INLINE bool push(T item, unsigned priority = 0);
TF_FORCE_INLINE bool push(T item, unsigned priority);

/**
@brief pops out an item from the queue
Expand Down
31 changes: 22 additions & 9 deletions unittests/work_stealing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void tsq_owner() {
// push and pop
for(size_t i=0; i<N; ++i) {
gold[i] = &i;
queue.push(gold[i]);
queue.push(gold[i], 0);
}
for(size_t i=0; i<N; ++i) {
auto ptr = queue.pop();
Expand All @@ -30,7 +30,7 @@ void tsq_owner() {

// push and steal
for(size_t i=0; i<N; ++i) {
queue.push(gold[i]);
queue.push(gold[i], 0);
}
// i starts from 1 to avoid cache effect
for(size_t i=1; i<N; ++i) {
Expand Down Expand Up @@ -71,7 +71,7 @@ void tsq_n_thieves(size_t M) {

// master thread
for(size_t i=0; i<N; ++i) {
queue.push(gold[i]);
queue.push(gold[i], 0);
}

std::vector<void*> items;
Expand Down Expand Up @@ -301,39 +301,52 @@ void starvation_test(size_t W) {

REQUIRE(counter == W - W/2);

/*

//TODO: bug? (some extreme situations may run forever ...)
// large linear chain followed by many branches
size_t N = 100000;
size_t target = 0;
taskflow.clear();
counter = 0;

for(size_t l=0; l<N; l++) {
curr = taskflow.emplace([&](){
curr = taskflow.emplace([&, l](){
while(executor.num_thieves() != 0);
//if(l == N-1) {
//printf("worker %d at the last node of the chain\n", executor.this_worker_id());
//}
});
if(l) {
curr.succeed(prev);
}
prev = curr;
}

const int w = rand() % W;

for(size_t b=0; b<N; b++) {
if(b & 1) {
// wait with a probability of 0.9
if(rand() % 10 != 0) {
taskflow.emplace([&](){
if(executor.this_worker_id() != 0) {
while(counter != N/2);
if(executor.this_worker_id() != w) {
//printf("worker %lu enters the loop (t=%lu, c=%lu, w=%d, n=%lu)\n",
// worker->id(), target, counter.load(), w, worker->queue_size()
//);
while(counter != target);
}
}).succeed(curr);
}
// increment the counter with a probability of 0.1
else {
target++;
taskflow.emplace([&](){ ++counter; }).succeed(curr);
}
}

executor.run(taskflow).wait();

REQUIRE(counter == N/2);*/
REQUIRE(counter == target);

}

TEST_CASE("WorkStealing.Starvation.1thread" * doctest::timeout(300)) {
Expand Down

0 comments on commit 59257c1

Please sign in to comment.