Skip to content

Commit

Permalink
Pause all threads while swapping hash table.
Browse files Browse the repository at this point in the history
We used to hold a global lock around all modifications to the hash table.

Then it was switched to wrapping hash table accesses in a global lock during
hash table expansion, set by notifying each worker thread to change lock
styles. There was a bug here which causes trylocks to clobber, due to the
specific item locks not being held during the global lock:
https://code.google.com/p/memcached/issues/detail?id=370

The patch previous to this one uses item locks during hash table expansion.
Since the item lock table is always smaller than the hash table, an item lock
will always cover both its new and old buckets.

However, we still need to pause all threads during the pointer swap and setup.
This patch pauses all background threads and worker threads, swaps the hash
table, then unpauses them.

This trades the (possibly significant) slowdown during the hash table copy,
with a short total hang at the beginning of each expansion. As previously;
those worried about consistent performance can presize the hash table with
`-o hashpower=n`
  • Loading branch information
dormando committed Dec 28, 2014
1 parent d2676b4 commit 6af7aa0
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 71 deletions.
35 changes: 17 additions & 18 deletions assoc.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include <pthread.h>

static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER;

static pthread_mutex_t maintenance_lock = PTHREAD_MUTEX_INITIALIZER;

typedef unsigned long int ub4; /* unsigned 4-byte quantities */
typedef unsigned char ub1; /* unsigned 1-byte quantities */
Expand Down Expand Up @@ -147,11 +147,6 @@ static void assoc_start_expand(void) {
if (started_expanding)
return;

/*With this condition, we can expanding holding only one item lock,
* and it should always be false*/
if (item_lock_hashpower >= hashpower)
return;

started_expanding = true;
pthread_cond_signal(&maintenance_cond);
}
Expand Down Expand Up @@ -209,11 +204,11 @@ int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;

static void *assoc_maintenance_thread(void *arg) {

mutex_lock(&maintenance_lock);
while (do_run_maintenance_thread) {
int ii = 0;

/* As there is only one thread process expanding, and we hold the item
* lock, it seems not necessary to hold the cache_lock . */
/* There is only one expansion thread, so no need to global lock. */
for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
item *it, *next;
int bucket;
Expand All @@ -223,7 +218,6 @@ static void *assoc_maintenance_thread(void *arg) {
* is the lowest N bits of the hv, and the bucket of item_locks is
* also the lowest M bits of hv, and N is greater than M.
* So we can process expanding with only one item_lock. cool! */
/*Get item lock for the slot in old hashtable*/
if ((item_lock = item_trylock(expand_bucket))) {
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
next = it->h_next;
Expand All @@ -247,9 +241,7 @@ static void *assoc_maintenance_thread(void *arg) {
}

} else {
/*wait for 100ms. since only one expanding thread, it's not
* necessary to sleep a random value*/
usleep(100*1000);
usleep(10*1000);
}

if (item_lock) {
Expand All @@ -260,11 +252,18 @@ static void *assoc_maintenance_thread(void *arg) {

if (!expanding) {
/* We are done expanding.. just wait for next invocation */
mutex_lock(&cache_lock);
started_expanding = false;
pthread_cond_wait(&maintenance_cond, &cache_lock);
pthread_cond_wait(&maintenance_cond, &maintenance_lock);
/* assoc_expand() swaps out the hash table entirely, so we need
* all threads to not hold any references related to the hash
* table while this happens.
* This is instead of a more complex, possibly slower algorithm to
* allow dynamic hash table expansion without causing significant
* wait times.
*/
pause_threads(PAUSE_ALL_THREADS);
assoc_expand();
mutex_unlock(&cache_lock);
pause_threads(RESUME_ALL_THREADS);
}
}
return NULL;
Expand All @@ -281,6 +280,7 @@ int start_assoc_maintenance_thread() {
hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
}
}
pthread_mutex_init(&maintenance_lock, NULL);
if ((ret = pthread_create(&maintenance_tid, NULL,
assoc_maintenance_thread, NULL)) != 0) {
fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
Expand All @@ -290,13 +290,12 @@ int start_assoc_maintenance_thread() {
}

void stop_assoc_maintenance_thread() {
mutex_lock(&cache_lock);
mutex_lock(&maintenance_lock);
do_run_maintenance_thread = 0;
pthread_cond_signal(&maintenance_cond);
mutex_unlock(&cache_lock);
mutex_unlock(&maintenance_lock);

/* Wait for the maintenance thread to stop */
pthread_join(maintenance_tid, NULL);
}


9 changes: 9 additions & 0 deletions items.c
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,15 @@ enum crawler_result_type lru_crawler_crawl(char *slabs) {
return CRAWLER_OK;
}

/* If we hold this lock, crawler can't wake up or move */
void lru_crawler_pause(void) {
pthread_mutex_lock(&lru_crawler_lock);
}

void lru_crawler_resume(void) {
pthread_mutex_unlock(&lru_crawler_lock);
}

int init_lru_crawler(void) {
if (lru_crawler_initialized == 0) {
if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) {
Expand Down
2 changes: 2 additions & 0 deletions items.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ int start_item_crawler_thread(void);
int stop_item_crawler_thread(void);
int init_lru_crawler(void);
enum crawler_result_type lru_crawler_crawl(char *slabs);
void lru_crawler_pause(void);
void lru_crawler_resume(void);
13 changes: 6 additions & 7 deletions memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,11 @@ enum network_transport {
udp_transport
};

enum item_lock_types {
ITEM_LOCK_GRANULAR = 0,
ITEM_LOCK_GLOBAL
enum pause_thread_types {
PAUSE_WORKER_THREADS = 0,
PAUSE_ALL_THREADS,
RESUME_ALL_THREADS,
RESUME_WORKER_THREADS
};

#define IS_UDP(x) (x == udp_transport)
Expand Down Expand Up @@ -389,7 +391,6 @@ typedef struct {
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
uint8_t item_lock_type; /* use fine-grained or global item lock */
} LIBEVENT_THREAD;

typedef struct {
Expand Down Expand Up @@ -574,13 +575,11 @@ void item_stats_sizes(ADD_STAT add_stats, void *c);
void item_unlink(item *it);
void item_update(item *it);

void item_lock_global(void);
void item_unlock_global(void);
void item_lock(uint32_t hv);
void *item_trylock(uint32_t hv);
void item_trylock_unlock(void *arg);
void item_unlock(uint32_t hv);
void switch_item_lock_type(enum item_lock_types type);
void pause_threads(enum pause_thread_types type);
unsigned short refcount_incr(unsigned short *refcount);
unsigned short refcount_decr(unsigned short *refcount);
void STATS_LOCK(void);
Expand Down
85 changes: 39 additions & 46 deletions thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
/* Lock for global stats */
static pthread_mutex_t stats_lock;

/* Lock to cause worker threads to hang up after being woken */
static pthread_mutex_t worker_hang_lock;

/* Free list of CQ_ITEM structs */
static CQ_ITEM *cqi_freelist;
static pthread_mutex_t cqi_freelist_lock;
Expand All @@ -59,10 +62,6 @@ static uint32_t item_lock_count;
unsigned int item_lock_hashpower;
#define hashsize(n) ((unsigned long int)1<<(n))
#define hashmask(n) (hashsize(n)-1)
/* this lock is temporarily engaged during a hash table expansion */
static pthread_mutex_t item_global_lock;
/* thread-specific variable for deeply finding the item lock type */
static pthread_key_t item_lock_type_key;

static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;

Expand Down Expand Up @@ -112,22 +111,8 @@ unsigned short refcount_decr(unsigned short *refcount) {
#endif
}

/* Convenience functions for calling *only* when in ITEM_LOCK_GLOBAL mode */
void item_lock_global(void) {
mutex_lock(&item_global_lock);
}

void item_unlock_global(void) {
mutex_unlock(&item_global_lock);
}

void item_lock(uint32_t hv) {
uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
} else {
mutex_lock(&item_global_lock);
}
mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
}

/* Special case. When ITEM_LOCK_GLOBAL mode is enabled, this should become a
Expand All @@ -150,12 +135,7 @@ void item_trylock_unlock(void *lock) {
}

void item_unlock(uint32_t hv) {
uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
} else {
mutex_unlock(&item_global_lock);
}
mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
}

static void wait_for_thread_registration(int nthreads) {
Expand All @@ -169,25 +149,44 @@ static void register_thread_initialized(void) {
init_count++;
pthread_cond_signal(&init_cond);
pthread_mutex_unlock(&init_lock);
/* Force worker threads to pile up if someone wants us to */
pthread_mutex_lock(&worker_hang_lock);
pthread_mutex_unlock(&worker_hang_lock);
}

void switch_item_lock_type(enum item_lock_types type) {
/* Must not be called with any deeper locks held:
* item locks, cache_lock, stats_lock, etc
*/
void pause_threads(enum pause_thread_types type) {
char buf[1];
int i;

buf[0] = 0;
switch (type) {
case ITEM_LOCK_GRANULAR:
buf[0] = 'l';
case PAUSE_ALL_THREADS:
slabs_rebalancer_pause();
lru_crawler_pause();
case PAUSE_WORKER_THREADS:
buf[0] = 'p';
pthread_mutex_lock(&worker_hang_lock);
break;
case ITEM_LOCK_GLOBAL:
buf[0] = 'g';
case RESUME_ALL_THREADS:
slabs_rebalancer_resume();
lru_crawler_resume();
case RESUME_WORKER_THREADS:
pthread_mutex_unlock(&worker_hang_lock);
break;
default:
fprintf(stderr, "Unknown lock type: %d\n", type);
assert(1 == 0);
break;
}

/* Only send a message if we have one. */
if (buf[0] == 0) {
return;
}

pthread_mutex_lock(&init_lock);
init_count = 0;
for (i = 0; i < settings.num_threads; i++) {
Expand Down Expand Up @@ -374,13 +373,6 @@ static void *worker_libevent(void *arg) {
* all threads have finished initializing.
*/

/* set an indexable thread-specific memory item for the lock type.
* this could be unnecessary if we pass the conn *c struct through
* all item_lock calls...
*/
me->item_lock_type = ITEM_LOCK_GRANULAR;
pthread_setspecific(item_lock_type_key, &me->item_lock_type);

register_thread_initialized();

event_base_loop(me->base, 0);
Expand Down Expand Up @@ -425,13 +417,8 @@ static void thread_libevent_process(int fd, short which, void *arg) {
cqi_free(item);
}
break;
/* we were told to flip the lock type and report in */
case 'l':
me->item_lock_type = ITEM_LOCK_GRANULAR;
register_thread_initialized();
break;
case 'g':
me->item_lock_type = ITEM_LOCK_GLOBAL;
/* we were told to pause and report in */
case 'p':
register_thread_initialized();
break;
}
Expand Down Expand Up @@ -784,6 +771,7 @@ void memcached_thread_init(int nthreads, struct event_base *main_base) {

pthread_mutex_init(&cache_lock, NULL);
pthread_mutex_init(&stats_lock, NULL);
pthread_mutex_init(&worker_hang_lock, NULL);

pthread_mutex_init(&init_lock, NULL);
pthread_cond_init(&init_cond, NULL);
Expand All @@ -803,6 +791,13 @@ void memcached_thread_init(int nthreads, struct event_base *main_base) {
power = 13;
}

if (power >= hashpower) {
fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
exit(1);
}

item_lock_count = hashsize(power);
item_lock_hashpower = power;

Expand All @@ -814,8 +809,6 @@ void memcached_thread_init(int nthreads, struct event_base *main_base) {
for (i = 0; i < item_lock_count; i++) {
pthread_mutex_init(&item_locks[i], NULL);
}
pthread_key_create(&item_lock_type_key, NULL);
pthread_mutex_init(&item_global_lock, NULL);

threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
if (! threads) {
Expand Down

0 comments on commit 6af7aa0

Please sign in to comment.