Skip to content

Commit

Permalink
do not assume that a semaphore can be freed just because nobody is us…
Browse files Browse the repository at this point in the history
…ing it

Let S be a semaphore, initially 0.  Let thread A execute UP(S);
let thread B execute DOWN(S); free(&S);  It is unclear whether this
code is correct with posix semaphores.  The problem is whether UP()
uses S after allowing DOWN() to continue; this seems to be the
case in the glibc-2.7 implementation, and thus the pattern above
seems to be incorrect.  Avoid using such a pattern, and introduce
a global semaphore for the unavoidable case when nothing else
can be depended upon.
  • Loading branch information
matteo-frigo committed Oct 29, 2008
1 parent 1b1dd4a commit 0123295
Showing 1 changed file with 56 additions and 42 deletions.
98 changes: 56 additions & 42 deletions threads/threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@

#define FFTW_WORKER void *

static void os_create_worker(FFTW_WORKER (*worker)(void *arg),
static void os_create_thread(FFTW_WORKER (*worker)(void *arg),
void *arg)
{
pthread_attr_t attr;
Expand All @@ -145,7 +145,7 @@ static void os_create_worker(FFTW_WORKER (*worker)(void *arg),
pthread_attr_destroy(&attr);
}

static void os_destroy_worker(void)
static void os_destroy_thread(void)
{
pthread_exit((void *)0);
}
Expand Down Expand Up @@ -206,7 +206,7 @@ static void os_sem_up(os_sem_t *s)
#define FFTW_WORKER unsigned __stdcall
typedef unsigned (__stdcall *winthread_start) (void *);

static void os_create_worker(winthread_start worker,
static void os_create_thread(winthread_start worker,
void *arg)
{
_beginthreadex((void *)NULL, /* security attrib */
Expand All @@ -217,7 +217,7 @@ static void os_create_worker(winthread_start worker,
(unsigned *)NULL); /* tid */
}

static void os_destroy_worker(void)
static void os_destroy_thread(void)
{
_endthreadex(0);
}
Expand All @@ -232,17 +232,35 @@ static void os_destroy_worker(void)
/* Main code: */
struct worker {
os_sem_t ready;
os_sem_t done;
struct work *volatile w;
struct worker *volatile cdr;
};

static struct worker *make_worker(void)
{
struct worker *q = MALLOC(sizeof(*q), OTHER);
os_sem_init(&q->ready);
os_sem_init(&q->done);
return q;
}

static void *unmake_worker(struct worker *q)
{
os_sem_destroy(&q->done);
os_sem_destroy(&q->ready);
X(ifree)(q);
}

struct work {
spawn_function proc;
spawn_data d;
os_sem_t done;
struct worker *q; /* the worker responsible for performing this work */
};

static os_mutex_t queue_lock;
static os_sem_t termination_semaphore;

static struct worker *volatile worker_queue;
#define WITH_QUEUE_LOCK(what) \
{ \
Expand All @@ -253,12 +271,17 @@ static struct worker *volatile worker_queue;

static FFTW_WORKER worker(void *arg)
{
struct worker ego;
struct work *w = (struct work *)arg;
os_sem_init(&ego.ready);
struct worker *ego = (struct worker *)arg;
struct work *w;

for (;;) {
/* wait until work becomes available */
os_sem_down(&ego->ready);

w = ego->w;

do {
A(w->proc);
/* !w->proc ==> terminate worker */
if (!w->proc) break;

/* do the work */
w->proc(&w->d);
Expand All @@ -270,25 +293,18 @@ static FFTW_WORKER worker(void *arg)

/* enqueue worker into worker queue: */
WITH_QUEUE_LOCK({
ego.cdr = worker_queue;
worker_queue = &ego;
ego->cdr = worker_queue;
worker_queue = ego;
});

/* signal that work is done */
os_sem_up(&w->done);

/* wait until work becomes available */
os_sem_down(&ego.ready);

w = ego.w;
} while (w->proc);
os_sem_up(&ego->done);
}

/* termination protocol */
os_sem_up(&w->done);

os_sem_destroy(&ego.ready);
os_sem_up(&termination_semaphore);

os_destroy_worker();
os_destroy_thread();
/* UNREACHABLE */
return 0;
}
Expand All @@ -298,31 +314,30 @@ static void kill_workforce(void)
struct work w;

w.proc = 0;
os_sem_init(&w.done);

WITH_QUEUE_LOCK({
/* tell all workers that they must terminate.
Because workers enqueue themselves before signaling the
completion of the work, all workers belong to the worker queue
if we get here. Also, all workers are waiting at
os_sem_down(&ego.ready), so we can hold the queue lock without
os_sem_down(ready), so we can hold the queue lock without
deadlocking */
while (worker_queue) {
struct worker *r = worker_queue;
worker_queue = r->cdr;
r->w = &w;
os_sem_up(&r->ready);
os_sem_down(&w.done);
struct worker *q = worker_queue;
worker_queue = q->cdr;
q->w = &w;
os_sem_up(&q->ready);
os_sem_down(&termination_semaphore);
unmake_worker(q);
}
});

os_sem_destroy(&w.done);
}

int X(ithreads_init)(void)
{
os_mutex_init(&queue_lock);
os_sem_init(&termination_semaphore);

WITH_QUEUE_LOCK({
worker_queue = 0;
Expand Down Expand Up @@ -378,8 +393,6 @@ void X(spawn_loop)(int loopmax, int nthr, spawn_function proc, void *data)
struct worker *q = 0;

/* dispatch work W to some worker */
os_sem_init(&w->done);

WITH_QUEUE_LOCK({
if (worker_queue) {
/* a worker is available. Remove it from the
Expand All @@ -389,21 +402,21 @@ void X(spawn_loop)(int loopmax, int nthr, spawn_function proc, void *data)
}
});

if (q) {
/* a worker is available, wake it up */
q->w = w;
os_sem_up(&q->ready);
} else {
if (!q) {
/* no worker is available. Create one */
os_create_worker(worker, w);
q = make_worker();
os_create_thread(worker, q);
}

q->w = w;
w->q = q;
os_sem_up(&q->ready);
}
}

for (i = 0; i < nthr - 1; ++i) {
struct work *w = &r[i];
os_sem_down(&w->done);
os_sem_destroy(&w->done);
os_sem_down(&w->q->done);
}

STACK_FREE(r);
Expand All @@ -414,6 +427,7 @@ void X(threads_cleanup)(void)
{
kill_workforce();
os_mutex_destroy(&queue_lock);
os_sem_destroy(&termination_semaphore);
}

#endif /* HAVE_THREADS */

0 comments on commit 0123295

Please sign in to comment.