Skip to content

Commit

Permalink
Patch svn r35189 & patch_from_svn interprets test files better
Browse files Browse the repository at this point in the history
  • Loading branch information
gejun committed Aug 29, 2017
1 parent 38ec549 commit 8b69ecf
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 79 deletions.
5 changes: 3 additions & 2 deletions src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ bthread_t bthread_self(void) __THROW {
if (g != NULL && !g->is_current_main_task()/*note*/) {
return g->current_tid();
}
return 0;
return INVALID_BTHREAD;
}

int bthread_equal(bthread_t t1, bthread_t t2) __THROW {
Expand Down Expand Up @@ -314,7 +314,8 @@ int bthread_usleep(uint64_t microseconds) __THROW {
int bthread_yield(void) __THROW {
bthread::TaskGroup* g = bthread::tls_task_group;
if (NULL != g && !g->is_current_pthread_task()) {
return bthread::TaskGroup::yield(&g);
bthread::TaskGroup::yield(&g);
return 0;
}
return pthread_yield();
}
Expand Down
3 changes: 2 additions & 1 deletion src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,8 @@ static void wait_for_butex(void* arg) {
// // Value unmatched or waiter is already woken up by TimerThread, jump
// // back to original bthread.
// TaskGroup* g = tls_task_group;
// g->set_remained(TaskGroup::ready_to_run_in_worker, (void*)g->current_tid());
// ReadyToRunArgs args = { g->current_tid(), false };
// g->set_remained(TaskGroup::ready_to_run_in_worker, &args);
// // 2: Don't run remained because we're already in a remained function
// // otherwise stack may overflow.
// TaskGroup::sched_to(&g, bw->tid, false/*2*/);
Expand Down
74 changes: 33 additions & 41 deletions src/bthread/task_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) {

if (!skip_remained) {
while (g->_last_context_remained) {
void (*fn)(void*) = g->_last_context_remained;
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = tls_task_group;
Expand Down Expand Up @@ -362,7 +362,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
}

void TaskGroup::_release_last_context(void* arg) {
TaskMeta* m = (TaskMeta*)arg;
TaskMeta* m = static_cast<TaskMeta*>(arg);
if (m->stack_type() != STACK_TYPE_PTHREAD) {
return_stack(m->release_stack()/*may be NULL*/);
} else {
Expand Down Expand Up @@ -411,10 +411,17 @@ int TaskGroup::start_foreground(TaskGroup** pg,
g->ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
// NOSIGNAL affects current task, not the new task.
g->set_remained(((using_attr.flags & BTHREAD_NOSIGNAL)
? ready_to_run_in_worker_nosignal
: ready_to_run_in_worker),
(void*)g->current_tid());
RemainedFn fn = NULL;
if (g->current_task()->about_to_quit) {
fn = ready_to_run_in_worker_ignoresignal;
} else {
fn = ready_to_run_in_worker;
}
ReadyToRunArgs args = {
g->current_tid(),
(bool)(using_attr.flags & BTHREAD_NOSIGNAL)
};
g->set_remained(fn, &args);
TaskGroup::sched_to(pg, m->tid);
}
return 0;
Expand Down Expand Up @@ -636,7 +643,7 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
}

while (g->_last_context_remained) {
void (*fn)(void*) = g->_last_context_remained;
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = tls_task_group;
Expand All @@ -661,24 +668,16 @@ void TaskGroup::destroy_self() {
}
}

void TaskGroup::ready_to_run(bthread_t tid) {
push_rq(tid);
const int additional_signal = _num_nosignal;
_num_nosignal = 0;
_nsignaled += 1 + additional_signal;
_control->signal_task(1 + additional_signal);
}

void TaskGroup::ready_to_run_nosignal(bthread_t tid) {
push_rq(tid);
++_num_nosignal;
}

void TaskGroup::ready_to_run(bthread_t tid, bool nosignal) {
push_rq(tid);
if (nosignal) {
return ready_to_run_nosignal(tid);
++_num_nosignal;
} else {
const int additional_signal = _num_nosignal;
_num_nosignal = 0;
_nsignaled += 1 + additional_signal;
_control->signal_task(1 + additional_signal);
}
return ready_to_run(tid);
}

void TaskGroup::flush_nosignal_tasks() {
Expand Down Expand Up @@ -737,16 +736,14 @@ void TaskGroup::flush_nosignal_tasks_general() {
return flush_nosignal_tasks_remote();
}

void TaskGroup::ready_to_run_in_worker(void* arg) {
return tls_task_group->ready_to_run((bthread_t)arg);
}

void TaskGroup::ready_to_run_in_worker_nosignal(void* arg) {
return tls_task_group->ready_to_run_nosignal((bthread_t)arg);
void TaskGroup::ready_to_run_in_worker(void* args_in) {
ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
return tls_task_group->ready_to_run(args->tid, args->nosignal);
}

void TaskGroup::ready_to_run_in_worker_ignoresignal(void* arg) {
return tls_task_group->push_rq((bthread_t)arg);
void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) {
ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
return tls_task_group->push_rq(args->tid);
}

struct SleepArgs {
Expand Down Expand Up @@ -775,7 +772,7 @@ void TaskGroup::_add_sleep_event(void* arg) {
base::microseconds_from_now(e.timeout_us));

if (!sleep_id) {
// TimerThread is stopping, schedule previous thread.
// fail to schedule timer, go back to previous thread.
// TODO(gejun): Need error?
g->ready_to_run(e.tid);
return;
Expand Down Expand Up @@ -810,12 +807,8 @@ void TaskGroup::_add_sleep_event(void* arg) {
// To be consistent with sys_usleep, set errno and return -1 on error.
int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) {
if (0 == timeout_us) {
int rc = yield(pg);
if (rc == 0) {
return 0;
}
errno = rc;
return -1;
yield(pg);
return 0;
}
TaskGroup* g = *pg;
// We have to schedule timer after we switched to next bthread otherwise
Expand Down Expand Up @@ -856,12 +849,11 @@ int TaskGroup::stop_usleep(bthread_t tid) {
return 0;
}

int TaskGroup::yield(TaskGroup** pg) {
void TaskGroup::yield(TaskGroup** pg) {
TaskGroup* g = *pg;
g->set_remained(ready_to_run_in_worker_ignoresignal,
(void*)g->current_tid());
ReadyToRunArgs args = { g->current_tid(), true };
g->set_remained(ready_to_run_in_worker_ignoresignal, &args);
sched(pg);
return 0;
}

void print_task(std::ostream& os, bthread_t tid) {
Expand Down
22 changes: 11 additions & 11 deletions src/bthread/task_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,17 @@ class TaskGroup {
static void ending_sched(TaskGroup** pg);

// Suspend caller and run bthread `next_tid' in TaskGroup *pg.
// Purpose of this function is to avoid pushing `next_tid' to local
// runqueue and then calling sched(pg), which has similar effect but
// slower.
// Purpose of this function is to avoid pushing `next_tid' to _rq and
// then being popped by sched(pg), which is not necessary.
static void sched_to(TaskGroup** pg, TaskMeta* next_meta);
static void sched_to(TaskGroup** pg, bthread_t next_tid);
static void exchange(TaskGroup** pg, bthread_t next_tid);

// The callback will be run in the beginning of next-run bthread.
// Can't be called by current bthread directly because it often needs
// the target to be suspended already.
void set_remained(void (*cb)(void*), void* arg) {
typedef void (*RemainedFn)(void*);
void set_remained(RemainedFn cb, void* arg) {
_last_context_remained = cb;
_last_context_remained_arg = arg;
}
Expand All @@ -88,8 +88,7 @@ class TaskGroup {

// Suspend caller and run another bthread. When the caller will resume
// is undefined.
// Returns 0 on success, -1 otherwise and errno is set.
static int yield(TaskGroup** pg);
static void yield(TaskGroup** pg);

// Suspend caller until bthread `tid' terminates.
static int join(bthread_t tid, void** return_value);
Expand Down Expand Up @@ -132,9 +131,7 @@ class TaskGroup {
int64_t cumulated_cputime_ns() const { return _cumulated_cputime_ns; }

// Push a bthread into the runqueue
void ready_to_run(bthread_t tid);
void ready_to_run(bthread_t tid, bool nosignal);
void ready_to_run_nosignal(bthread_t tid);
void ready_to_run(bthread_t tid, bool nosignal = false);
// Flush tasks pushed to rq but signalled.
void flush_nosignal_tasks();

Expand Down Expand Up @@ -182,8 +179,11 @@ friend class TaskControl;
// Callbacks for set_remained()
static void _release_last_context(void*);
static void _add_sleep_event(void*);
struct ReadyToRunArgs {
bthread_t tid;
bool nosignal;
};
static void ready_to_run_in_worker(void*);
static void ready_to_run_in_worker_nosignal(void*);
static void ready_to_run_in_worker_ignoresignal(void*);

// Wait for a task to run.
Expand Down Expand Up @@ -216,7 +216,7 @@ friend class TaskControl;
int64_t _cumulated_cputime_ns;

size_t _nswitch;
void (*_last_context_remained)(void*);
RemainedFn _last_context_remained;
void* _last_context_remained_arg;

ParkingLot* _pl;
Expand Down
7 changes: 6 additions & 1 deletion src/bthread/task_group_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ inline void TaskGroup::exchange(TaskGroup** pg, bthread_t next_tid) {
if (g->is_current_pthread_task()) {
return g->ready_to_run(next_tid);
}
ReadyToRunArgs args = { g->current_tid(), false };
g->set_remained((g->current_task()->about_to_quit
? ready_to_run_in_worker_ignoresignal
: ready_to_run_in_worker),
(void*)g->current_tid());
&args);
TaskGroup::sched_to(pg, next_tid);
}

Expand Down Expand Up @@ -73,6 +74,10 @@ inline void TaskGroup::push_rq(bthread_t tid) {
// baidu-rpc)
flush_nosignal_tasks();
LOG_EVERY_SECOND(ERROR) << "_rq is full, capacity=" << _rq.capacity();
// TODO(gejun): May cause deadlock when all workers are spinning here.
// A better solution is to pop and run existing bthreads, however which
// make set_remained()-callbacks do context switches and need extensive
// reviews on related code.
::usleep(1000);
}
}
Expand Down
46 changes: 34 additions & 12 deletions test/bthread_cond_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,24 +424,46 @@ void* wait_cond_thread(void* arg) {
return NULL;
}

TEST(CondTest, too_many_bthreads) {
std::vector<bthread_t> th;
th.resize(32768);
static void launch_many_bthreads() {
g_stop = false;
bthread_t tid;
BthreadCond c;
c.Init();
bthread_t tid;
base::Timer tm;
bthread_start_urgent(&tid, &BTHREAD_ATTR_PTHREAD, wait_cond_thread, &c);
for (size_t i = 0; i < th.size(); ++i) {
bthread_start_background(&th[i], NULL, usleep_thread, NULL);
}
c.Signal();
std::vector<bthread_t> tids;
tids.reserve(32768);
tm.start();
for (size_t i = 0; i < 32768; ++i) {
bthread_t t0;
ASSERT_EQ(0, bthread_start_background(&t0, NULL, usleep_thread, NULL));
tids.push_back(t0);
}
tm.stop();
LOG(INFO) << "Creating bthreads took " << tm.u_elapsed() << " us";
usleep(3 * 1000 * 1000L);
c.Signal();
g_stop = true;
bthread_join(tid, NULL);
ASSERT_TRUE(started_wait);
ASSERT_TRUE(ended_wait);
for (size_t i = 0; i < th.size(); ++i) {
bthread_join(th[i], NULL);
for (size_t i = 0; i < tids.size(); ++i) {
LOG_EVERY_SECOND(INFO) << "Joined " << i << " threads";
bthread_join(tids[i], NULL);
}
LOG_EVERY_SECOND(INFO) << "Joined " << tids.size() << " threads";
}

TEST(CondTest, too_many_bthreads_from_pthread) {
launch_many_bthreads();
}

static void* run_launch_many_bthreads(void*) {
launch_many_bthreads();
return NULL;
}

TEST(CondTest, too_many_bthreads_from_bthread) {
bthread_t th;
ASSERT_EQ(0, bthread_start_urgent(&th, NULL, run_launch_many_bthreads, NULL));
bthread_join(th, NULL);
}
} // namespace
7 changes: 2 additions & 5 deletions test/bthread_fd_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ TEST(FDTest, read_kernel_version) {

volatile bool stop = false;

const size_t NCLIENT = 30;

struct SocketMeta {
int fd;
int epfd;
Expand All @@ -62,6 +60,7 @@ struct EpollMeta {
int epfd;
};

const size_t NCLIENT = 30;
void* process_thread(void* arg) {
SocketMeta* m = (SocketMeta*)arg;
size_t count;
Expand Down Expand Up @@ -148,9 +147,7 @@ void* client_thread(void* arg) {
ssize_t rc;
do {
const int wait_rc = bthread_fd_wait(m->fd, EPOLLIN);
if (__builtin_expect(wait_rc != 0, 0)) {
EXPECT_EQ(0, wait_rc) << berror();
}
EXPECT_EQ(0, wait_rc) << berror();
rc = read(m->fd, &m->count, sizeof(m->count));
} while (rc < 0 && errno == EAGAIN);
#else
Expand Down
24 changes: 18 additions & 6 deletions tools/patch_from_svn
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,23 @@ if [ -d "$1/.svn" ]; then
cd $CURRENT_DIR
fi

MODIFIED_PATCHFILE="$(basename $PATCHFILE).brpc_os"
MODIFIED_PATCHFILE="$(dirname $PATCHFILE)/brpc_os.$(basename $PATCHFILE)"

# guess prefix of test files
TEST_PREFIX="test_"
TEST_SUFFIX=
if fgrep -q " bthread/" $PATCHFILE; then
TEST_PREFIX="bthread_"
TEST_SUFFIX="_unittest"
elif fgrep -q " bvar/" $PATCHFILE; then
TEST_PREFIX="bvar_"
TEST_SUFFIX="_unittest"
fi

cat $PATCHFILE | sed -e 's/src\/baidu\/rpc\//src\/brpc\//g' \
-e 's/\<baidu\/rpc\//brpc\//g' \
-e 's/src\/brpc\/test\/test_\(.*\)\.cpp/test\/brpc_\1_unittest.cpp/g' \
-e 's/\<src\/brpc\/test\/test_\(.*\)\.cpp/test\/brpc_\1_unittest.cpp/g' \
-e "s/\<test\/test_\(.*\)\.cpp/test\/${TEST_PREFIX}\1${TEST_SUFFIX}.cpp/g" \
-e 's/\<namespace \+baidu *{/namespace brpc {/g' \
-e 's/\<namespace \+rpc *{//g' \
-e 's/} *\/\/ \+namespace \+baidu/} \/\/ namespace brpc/g' \
Expand All @@ -34,10 +46,10 @@ cat $PATCHFILE | sed -e 's/src\/baidu\/rpc\//src\/brpc\//g' \
-e 's/\<protocol\/\(.*\)\.proto/src\/\1.proto/g' \
-e 's/TEST_F(HttpMessageTest/TEST(HttpMessageTest/g' \
-e 's/TEST_F(URITest/TEST(URITest/g' \
-e 's/bthread_cond\.cpp/src\/bthread\/condition_variable.cpp/g' \
-e 's/bthread_\([^.]*\)\.cpp/src\/bthread\/\1.cpp/g' \
-e 's/bthread_\([^.]*\)\.h/src\/bthread\/\1.h/g' \
-e 's/bthread\.h/src\/bthread\/bthread.h/g' \
-e 's/ bthread_cond\.cpp/ src\/bthread\/condition_variable.cpp/g' \
-e 's/ bthread_\([^.]*\)\.cpp/ src\/bthread\/\1.cpp/g' \
-e 's/ bthread_\([^.]*\)\.h/ src\/bthread\/\1.h/g' \
-e 's/ bthread\.\(h\|cpp\)/ src\/bthread\/bthread.\1/g' \
-e 's/<\(brpc\/[^>]*\)>/"\1"/g' \
-e 's/<\(bvar\/[^>]*\)>/"\1"/g' \
-e 's/<\(base\/[^>]*\)>/"\1"/g' \
Expand Down

0 comments on commit 8b69ecf

Please sign in to comment.