Skip to content

Commit

Permalink
use a mutex to resume lthreads in their old scheduler instead of pass…
Browse files Browse the repository at this point in the history
…ing the address in pipe.
  • Loading branch information
halayli committed Jan 2, 2012
1 parent 0ac9946 commit 5dbd2f4
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 38 deletions.
32 changes: 21 additions & 11 deletions src/lthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ _lthread_free(lthread_t *lt)
int
_lthread_resume(lthread_t *lt)
{
int ret = 0;

if (lt->state & bit(LT_NEW))
_lthread_init(lt);

Expand All @@ -146,13 +144,12 @@ _lthread_resume(lthread_t *lt)
_lthread_free(lt);
return -1;
} else {
_save_exec_state(lt);
/* place it in a compute scheduler if needed. */
ret = _save_exec_state(lt);
if (lt->state & bit(LT_PENDING_RUNCOMPUTE)) {
_lthread_compute_add(lt);
lthread_get_sched()->sleeping_state++;
}
return ret;
}

return 0;
Expand All @@ -169,7 +166,7 @@ _lthread_key_create(void)
{
if (pthread_key_create(&lthread_sched_key, _lthread_key_destructor)) {
perror("Failed to allocate sched key");
abort();
abort();
return;
}
pthread_setspecific(lthread_sched_key, NULL);
Expand Down Expand Up @@ -197,7 +194,7 @@ _lthread_init(lthread_t *lt)
lt->state = bit(LT_READY);
}

int
inline int
_restore_exec_state(lthread_t *lt)
{
if (lt->stack_size) {
Expand All @@ -222,7 +219,7 @@ _save_exec_state(lthread_t *lt)
}
if ((lt->stack = calloc(1, size)) == NULL) {
perror("Failed to allocate memory to save stack\n");
return errno;
abort();
}
}

Expand All @@ -236,8 +233,9 @@ _save_exec_state(lthread_t *lt)
void
_sched_free(sched_t *sched)
{
free(lthread_get_sched()->stack);
free(lthread_get_sched());
free(sched->stack);
pthread_mutex_destroy(&sched->compute_mutex);
free(sched);
pthread_setspecific(lthread_sched_key, NULL);
}

Expand Down Expand Up @@ -270,6 +268,18 @@ sched_create(size_t stack_size)
return errno;
}

if (pthread_mutex_init(&new_sched->compute_mutex, NULL) != 0) {
perror("Failed to initialize compute_mutex\n");
_sched_free(new_sched);
return errno;
}

if (pipe(new_sched->compute_pipes) == -1) {
perror("Failed to initialize pipe\n");
_sched_free(new_sched);
return errno;
}

new_sched->stack_size = sched_stack_size;

new_sched->total_lthreads = 0;
Expand All @@ -279,7 +289,6 @@ sched_create(size_t stack_size)
new_sched->sleeping = RB_ROOT;
new_sched->birth = rdtsc();
LIST_INIT(&new_sched->new);
pipe(new_sched->compute_pipes);

bzero(&new_sched->st, sizeof(struct _cpu_state));

Expand Down Expand Up @@ -399,7 +408,8 @@ void
lthread_cond_signal(lthread_cond_t *c)
{
if (c->blocked_lthread != NULL) {
LIST_INSERT_HEAD(&lthread_get_sched()->new, c->blocked_lthread, new_next);
LIST_INSERT_HEAD(&lthread_get_sched()->new, c->blocked_lthread,
new_next);
}

c->blocked_lthread = NULL;
Expand Down
62 changes: 43 additions & 19 deletions src/lthread_compute.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ enum {THREAD_TIMEOUT_BEFORE_EXIT = 60};
pthread_key_t compute_sched_key;
pthread_once_t key_once = PTHREAD_ONCE_INIT;

LIST_HEAD(compute_sched_l, _compute_sched) compute_scheds = LIST_HEAD_INITIALIZER(compute_scheds);
LIST_HEAD(compute_sched_l, _compute_sched) compute_scheds = \
LIST_HEAD_INITIALIZER(compute_scheds);
pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;

static void* _lthread_compute_run(void *arg);
Expand All @@ -41,18 +42,24 @@ lthread_compute_begin(void)
}
}

/* create one if there is no scheduler available */
/* create schedule if there is no scheduler available */
if (compute_sched == NULL) {
if ((compute_sched = _lthread_compute_sched_create()) == NULL) {
pthread_mutex_unlock(&sched_mutex);
return -1;
/* we failed to create a scheduler. Use the first scheduler
* in the list, otherwise return failure.
*/
compute_sched = LIST_FIRST(&compute_scheds);
if (compute_sched == NULL) {
pthread_mutex_unlock(&sched_mutex);
return -1;
}
} else {
LIST_INSERT_HEAD(&compute_scheds, compute_sched, compute_next);
}
lt->compute_sched = compute_sched;
LIST_INSERT_HEAD(&compute_scheds, compute_sched, compute_next);
} else {
lt->compute_sched = compute_sched;
}

lt->compute_sched = compute_sched;

lt->state |= bit(LT_PENDING_RUNCOMPUTE);
pthread_mutex_lock(&lt->compute_sched->lthreads_mutex);
LIST_INSERT_HEAD(&lt->compute_sched->lthreads, lt, compute_next);
Expand Down Expand Up @@ -115,7 +122,8 @@ _lthread_compute_add(lthread_t *lt)
/* change ebp esp to be relative to the new stack address */
lt->st.ebp = lt->compute_sched->st.ebp = (void*)((intptr_t)stack - \
((intptr_t)org_stack - (intptr_t)(lt->st.ebp)));
lt->st.esp = lt->compute_sched->st.esp = (void*)((intptr_t)stack - lt->stack_size);
lt->st.esp = lt->compute_sched->st.esp = (void*)((intptr_t)stack - \
lt->stack_size);

pthread_mutex_lock(&lt->compute_sched->lthreads_mutex);
lt->state &= clearbit(LT_PENDING_RUNCOMPUTE);
Expand Down Expand Up @@ -146,12 +154,20 @@ _lthread_compute_sched_create(void)
if ((compute_sched = calloc(1, sizeof(compute_sched_t))) == NULL)
return NULL;

pthread_mutex_init(&compute_sched->run_mutex, NULL);
pthread_mutex_init(&compute_sched->lthreads_mutex, NULL);
pthread_cond_init(&compute_sched->run_mutex_cond, NULL);
if (pthread_mutex_init(&compute_sched->run_mutex, NULL) != 0 ||
pthread_mutex_init(&compute_sched->lthreads_mutex, NULL) != 0 ||
pthread_cond_init(&compute_sched->run_mutex_cond, NULL) != 0) {
free(compute_sched);
return NULL;
}

if (pthread_create(&pthread,
NULL, _lthread_compute_run, compute_sched) != 0) {
_lthread_compute_sched_free(compute_sched);
return NULL;
}

LIST_INIT(&compute_sched->lthreads);
pthread_create(&pthread, NULL, _lthread_compute_run, compute_sched);

return compute_sched;
}
Expand All @@ -174,6 +190,7 @@ _lthread_compute_save_exec_state(lthread_t *lt)
}
if ((lt->stack = calloc(1, size)) == NULL) {
perror("Failed to allocate memory to save stack\n");
abort();
return errno;
}
}
Expand All @@ -186,7 +203,8 @@ _lthread_compute_save_exec_state(lthread_t *lt)
org_stack = (void **)(lt->sched->stack + lt->sched->stack_size);

/* change ebp & esp back to be relative to the old stack address */
lt->st.ebp = (void*)((intptr_t)org_stack - ((intptr_t)stack - (intptr_t)(lt->st.ebp)));
lt->st.ebp = (void*)((intptr_t)org_stack - ((intptr_t)stack - \
(intptr_t)(lt->st.ebp)));
lt->st.esp = (void*)((intptr_t)org_stack - lt->stack_size);

return 0;
Expand Down Expand Up @@ -227,7 +245,7 @@ _lthread_compute_run(void *arg)
while (1) {
pthread_mutex_lock(&compute_sched->lthreads_mutex);

/* we have no work to do, break and wait 60 secs then exit if possible */
/* we have no work to do, break and wait 60 secs then exit */
if (LIST_EMPTY(&compute_sched->lthreads)) {
pthread_mutex_unlock(&compute_sched->lthreads_mutex);
break;
Expand All @@ -252,14 +270,20 @@ _lthread_compute_run(void *arg)
compute_sched->state = COMPUTE_FREE;

/* resume it back on the prev scheduler */
uintptr_t buf[1] = {(uintptr_t)lt};
write(lt->sched->compute_pipes[1], buf, sizeof(buf));
pthread_mutex_lock(&lt->sched->compute_mutex);
LIST_INSERT_HEAD(&lt->sched->compute, lt, compute_sched_next);
pthread_mutex_unlock(&lt->sched->compute_mutex);

/* signal the prev scheduler in case it was sleeping in a poll */
write(lt->sched->compute_pipes[1], "1", 1);
}

pthread_mutex_lock(&compute_sched->run_mutex);
timeout.tv_sec = time(NULL) + THREAD_TIMEOUT_BEFORE_EXIT; /* wait if we have no work to do, exit */
/* wait if we have no work to do, exit */
timeout.tv_sec = time(NULL) + THREAD_TIMEOUT_BEFORE_EXIT;
timeout.tv_nsec = 0;
status = pthread_cond_timedwait(&compute_sched->run_mutex_cond, &compute_sched->run_mutex, &timeout);
status = pthread_cond_timedwait(&compute_sched->run_mutex_cond,
&compute_sched->run_mutex, &timeout);
pthread_mutex_unlock(&compute_sched->run_mutex);

/* if we didn't timeout, then we got signaled to do some work */
Expand Down
6 changes: 5 additions & 1 deletion src/lthread_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "common/queue.h"
#include <sys/types.h>
#include <errno.h>
#include <pthread.h>

#if defined(__FreeBSD__) || defined(__APPLE__)
#include <sys/event.h>
Expand Down Expand Up @@ -121,6 +122,7 @@ struct _lthread {
LIST_ENTRY(_lthread) new_next;
LIST_ENTRY(_lthread) sleep_next;
LIST_ENTRY(_lthread) compute_next;
LIST_ENTRY(_lthread) compute_sched_next;
sched_node_t *sched_node;
lthread_l_t *sleep_list;
void *stack;
Expand All @@ -142,6 +144,7 @@ struct _sched {
int total_new_events;
/* lists to save an lthread depending on its state */
lthread_l_t new;
lthread_l_t compute;
struct rb_root sleeping;
uint64_t birth;
void *stack;
Expand All @@ -154,6 +157,7 @@ struct _sched {
struct epoll_event eventlist[LT_MAX_EVENTS];
#endif
int compute_pipes[2];
pthread_mutex_t compute_mutex;
};

typedef enum {
Expand Down Expand Up @@ -183,7 +187,7 @@ void _lthread_wait_for(lthread_t *lt, int fd, lt_event_t e);
int _sched_lthread(lthread_t *lt, uint64_t usecs);
void _desched_lthread(lthread_t *lt);
void clear_rd_wr_state(lthread_t *lt);
int _restore_exec_state(lthread_t *lt);
inline int _restore_exec_state(lthread_t *lt);
int _switch(struct _cpu_state *new_state, struct _cpu_state *cur_state);
int sched_create(size_t stack_size);
int _save_exec_state(lthread_t *lt);
Expand Down
27 changes: 20 additions & 7 deletions src/lthread_sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ _rb_insert(struct rb_root *root, sched_node_t *data)
return 0;
}

static char tmp[100];
void
lthread_join()
{
Expand All @@ -111,21 +112,33 @@ lthread_join()
}
}

/* 3. check if we received any events after lthread_poll */
/* 3. resume lthreads we received from lthread_compute, if any */
while (1) {
pthread_mutex_lock(&sched->compute_mutex);
lt = LIST_FIRST(&sched->compute);
if (lt == NULL) {
pthread_mutex_unlock(&sched->compute_mutex);
break;
}
LIST_REMOVE(lt, compute_sched_next);
pthread_mutex_unlock(&sched->compute_mutex);
sched->sleeping_state--;
_lthread_resume(lt);
}

/* 4. check if we received any events after lthread_poll */
register_rd_interest(sched->compute_pipes[0]);
_lthread_poll();

/* 4. fire up lthreads that are ready to run */
/* 5. fire up lthreads that are ready to run */
while (sched->total_new_events) {
p = --sched->total_new_events;

/* 5. resume lthread_compute thread we received on pipe */
/* We got signaled via pipe to wakeup from polling & rusume compute. * Those lthreads will get handled in step 3.
*/
fd = get_fd(&sched->eventlist[p]);
if (fd == sched->compute_pipes[0]) {
lt = NULL;
read(fd, &lt, sizeof(uintptr_t));
sched->sleeping_state--;
_lthread_resume(lt);
read(fd, &tmp, sizeof(tmp));
continue;
}

Expand Down

0 comments on commit 5dbd2f4

Please sign in to comment.