Skip to content

Commit

Permalink
r35401: Refactored interruptions on bthreads. The semantic is more si…
Browse files Browse the repository at this point in the history
…milar to pthread's and EINTR is returned instead of ESTOP.
  • Loading branch information
gejun committed Oct 24, 2017
1 parent 412143e commit 5a898e1
Show file tree
Hide file tree
Showing 17 changed files with 318 additions and 315 deletions.
25 changes: 8 additions & 17 deletions src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>), atomi
pthread_mutex_t g_task_control_mutex = PTHREAD_MUTEX_INITIALIZER;
// Referenced in rpc, needs to be extern.
// Notice that we can't declare the variable as atomic<TaskControl*> which
// may not initialized before creating bthreads before main().
// are not constructed before main().
TaskControl* g_task_control = NULL;

extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
Expand Down Expand Up @@ -106,8 +106,6 @@ start_from_non_worker(bthread_t* __restrict tid,
tid, attr, fn, arg);
}

int stop_butex_wait(bthread_t tid);

struct TidTraits {
static const size_t BLOCK_SIZE = 63;
static const size_t MAX_ENTRIES = 65536;
Expand Down Expand Up @@ -169,23 +167,17 @@ void bthread_flush() __THROW {
}
}

int bthread_interrupt(bthread_t tid) __THROW {
return bthread::TaskGroup::interrupt(tid, bthread::get_task_control());
}

int bthread_stop(bthread_t tid) __THROW {
if (bthread::stop_butex_wait(tid) < 0) {
return errno;
}
bthread::TaskGroup* g = bthread::tls_task_group;
if (!g) {
bthread::TaskControl* c = bthread::get_or_new_task_control();
if (!c) {
return ENOMEM;
}
g = c->choose_one_group();
}
return g->stop_usleep(tid);
bthread::TaskGroup::set_stopped(tid);
return bthread_interrupt(tid);
}

int bthread_stopped(bthread_t tid) __THROW {
return bthread::TaskGroup::stopped(tid);
return (int)bthread::TaskGroup::is_stopped(tid);
}

bthread_t bthread_self(void) __THROW {
Expand Down Expand Up @@ -319,7 +311,6 @@ int bthread_usleep(uint64_t microseconds) __THROW {
if (NULL != g && !g->is_current_pthread_task()) {
return bthread::TaskGroup::usleep(&g, microseconds);
}
// TODO: return ESTOP for pthread_task
return ::usleep(microseconds);
}

Expand Down
51 changes: 38 additions & 13 deletions src/bthread/bthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,33 +33,55 @@

__BEGIN_DECLS

// Create bthread `fn(arg)' with attributes `attr' and put the identifier into
// Create bthread `fn(args)' with attributes `attr' and put the identifier into
// `tid'. Switch to the new thread and schedule old thread to run. Use this
// function when the new thread is more urgent.
// Returns 0 on success, errno otherwise.
extern int bthread_start_urgent(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) __THROW;
void* __restrict args) __THROW;

// Create bthread `fn(arg)' with attributes `attr' and put the identifier into
// Create bthread `fn(args)' with attributes `attr' and put the identifier into
// `tid'. This function behaves closer to pthread_create: after scheduling the
// new thread to run, it returns. In another word, the new thread may take
// longer time than bthread_start_urgent() to run.
// Return 0 on success, errno otherwise.
extern int bthread_start_background(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) __THROW;
void* __restrict args) __THROW;

// Wake up operations blocking the thread. Different functions may behave
// differently:
// bthread_usleep(): returns -1 and sets errno to ESTOP if bthread_stop()
// is called, or to EINTR otherwise.
// butex_wait(): returns -1 and sets errno to EINTR
// bthread_mutex_*lock: unaffected (still blocking)
// bthread_cond_*wait: wakes up and returns 0.
// bthread_*join: unaffected.
// Common usage of interruption is to make a thread to quit ASAP.
// [Thread1] [Thread2]
// set stopping flag
// bthread_interrupt(Thread2)
// wake up
// see the flag and quit
// may block again if the flag is unchanged
// bthread_interrupt() guarantees that Thread2 is woken up reliably no matter
// how the 2 threads are interleaved.
// Returns 0 on success, errno otherwise.
extern int bthread_interrupt(bthread_t tid) __THROW;

// Ask the bthread `tid' to stop. Operations which would suspend the thread
// except bthread_join will not block, instead they return ESTOP.
// This is a cooperative stopping mechanism.
// Make bthread_stopped() on the bthread return true and interrupt the bthread.
// Note that current bthread_stop() solely sets the built-in "stop flag" and
// calls bthread_interrupt(), which is different from earlier versions of
// bthread, and replaceable by user-defined stop flags plus calls to
// bthread_interrupt().
// Returns 0 on success, errno otherwise.
extern int bthread_stop(bthread_t tid) __THROW;

// Returns 1 iff bthread_stop() was called on the thread or the thread does
// not exist, 0 otherwise.
// Returns 1 iff bthread_stop(tid) was called or the thread does not exist,
// 0 otherwise.
extern int bthread_stopped(bthread_t tid) __THROW;

// Returns identifier of caller if caller is a bthread, 0 otherwise(Id of a
Expand All @@ -75,10 +97,12 @@ extern int bthread_equal(bthread_t t1, bthread_t t2) __THROW;
extern void bthread_exit(void* retval) __attribute__((__noreturn__));

// Make calling thread wait for termination of bthread `bt'. Return immediately
// if `bt' is already terminated. The exit status of the bthread shall be
// stored in *bthread_return (if it's not NULL), however at present it's
// always set to NULL. There's no "detachment" in bthreads, all bthreads are
// "detached" as default and still joinable.
// if `bt' is already terminated.
// Notes:
// - All bthreads are "detached" but still joinable.
// - *bthread_return is always set to null. If you need to return value
// from a bthread, pass the value via the `args' created the bthread.
// - bthread_join() is not affected by bthread_interrupt.
// Returns 0 on success, errno otherwise.
extern int bthread_join(bthread_t bt, void** bthread_return) __THROW;

Expand Down Expand Up @@ -125,6 +149,7 @@ extern int bthread_setconcurrency(int num) __THROW;
extern int bthread_yield(void) __THROW;

// Suspend current thread for at least `microseconds'
// Interruptible by bthread_interrupt().
extern int bthread_usleep(uint64_t microseconds) __THROW;

// ---------------------------------------------
Expand Down
Loading

0 comments on commit 5a898e1

Please sign in to comment.