Skip to content

Commit

Permalink
prototype functionality for LRU metadumper
Browse files Browse the repository at this point in the history
Functionality is nearly all there. A handful of FIXME's and TODO's to address.
From there it needs to be refactored into something proper.
  • Loading branch information
dormando committed Aug 20, 2016
1 parent c793bae commit a8347f6
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 19 deletions.
195 changes: 178 additions & 17 deletions items.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <time.h>
#include <assert.h>
#include <unistd.h>
#include <poll.h>

/* Forward Declarations */
static void item_link_q(item *it);
Expand Down Expand Up @@ -56,9 +57,17 @@ typedef struct {
bool run_complete;
} crawlerstats_t;

typedef struct {
void *c; /* original connection structure. still with source thread attached. */
int sfd; /* client fd. */
bipbuf_t *buf; /* output buffer */
char *cbuf; /* current buffer */
} crawler_client_t;

static item *heads[LARGEST_ID];
static item *tails[LARGEST_ID];
static crawler crawlers[LARGEST_ID];
static crawler_client_t crawler_client;
static itemstats_t itemstats[LARGEST_ID];
static unsigned int sizes[LARGEST_ID];
static uint64_t sizes_bytes[LARGEST_ID];
Expand Down Expand Up @@ -91,7 +100,7 @@ void item_stats_reset(void) {

static int lru_pull_tail(const int orig_id, const int cur_lru,
const uint64_t total_bytes, const bool do_evict);
static int lru_crawler_start(uint32_t id, uint32_t remaining);
static int lru_crawler_start(uint32_t id, uint32_t remaining, const enum crawler_run_type type);

/* Get the next CAS id for a new item. */
/* TODO: refactor some atomics for this. */
Expand Down Expand Up @@ -1050,7 +1059,7 @@ static void lru_maintainer_crawler_check(void) {
crawlerstats_t *s = &crawlerstats[i];
/* We've not successfully kicked off a crawl yet. */
if (last_crawls[i] == 0) {
if (lru_crawler_start(i, 0) > 0) {
if (lru_crawler_start(i, 0, CRAWLER_EXPIRED) > 0) {
last_crawls[i] = current_time;
}
}
Expand Down Expand Up @@ -1336,6 +1345,123 @@ static void item_crawler_evaluate(item *search, uint32_t hv, int i) {
}
}

static void item_crawler_metadump(item *it, uint32_t hv, int i) {
//int slab_id = CLEAR_LRU(i);
char keybuf[KEY_MAX_LENGTH * 3 + 1];
int is_flushed = item_is_flushed(it);
/* Ignore expired content. */
if ((it->exptime != 0 && it->exptime < current_time)
|| is_flushed) {
refcount_decr(&it->refcount);
return;
}
// TODO: uriencode directly into the buffer.
uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_LENGTH * 3 + 1);
int total = snprintf(crawler_client.cbuf, 4096,
"key=%s exp=%ld la=%u cas=%llu fetch=%s\n",
keybuf,
(it->exptime == 0) ? -1 : (long)it->exptime + process_started,
it->time,
(unsigned long long)ITEM_get_cas(it),
(it->it_flags & ITEM_FETCHED) ? "yes" : "no");
refcount_decr(&it->refcount);
// TODO: some way of tracking the errors. these are very unlikely though.
if (total >= 4096 || total <= 0) {
/* Failed to write, don't push it. */
return;
}
bipbuf_push(crawler_client.buf, total);
}

static void item_crawler_close_client(crawler_client_t *c) {
fprintf(stderr, "CRAWLER: Closing client\n");
sidethread_conn_close(c->c);
c->c = NULL;
c->cbuf = NULL;
bipbuf_free(c->buf);
c->buf = NULL;
}

static int item_crawler_metadump_poll(crawler_client_t *c) {
unsigned char *data;
unsigned int data_size = 0;
struct pollfd to_poll[1];
to_poll[0].fd = c->sfd;
to_poll[0].events = POLLOUT;

int ret = poll(to_poll, 1, 1000);

if (ret < 0) {
// fatal.
return -1;
}

if (ret == 0) return 0;

if (to_poll[0].revents & POLLIN) {
char buf[1];
int res = read(c->sfd, buf, 1);
if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
item_crawler_close_client(c);
return -1;
}
}
if ((data = bipbuf_peek_all(c->buf, &data_size)) != NULL) {
if (to_poll[0].revents & (POLLHUP|POLLERR)) {
item_crawler_close_client(c);
return -1;
} else if (to_poll[0].revents & POLLOUT) {
int total = write(c->sfd, data, data_size);
if (total == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
item_crawler_close_client(c);
return -1;
}
} else if (total == 0) {
item_crawler_close_client(c);
return -1;
} else {
bipbuf_poll(c->buf, total);
}
}
}
return 0;
}

/* Grab some space to work with, if none exists, run the poll() loop and wait
* for it to clear up or close.
* Return NULL if closed.
*/
static int item_crawler_metadump_getbuf(crawler_client_t *c) {
void *buf = NULL;
if (c->c == NULL) return -1;
/* not enough space. */
while ((buf = bipbuf_request(c->buf, 4096)) == NULL) {
// TODO: max loops before closing.
int ret = item_crawler_metadump_poll(c);
if (ret < 0) return ret;
}

c->cbuf = buf;
return 0;
}

static void item_crawler_class_done(int i) {
crawlers[i].it_flags = 0;
crawler_count--;
crawler_unlink_q((item *)&crawlers[i]);
pthread_mutex_unlock(&lru_locks[i]);
pthread_mutex_lock(&lru_crawler_stats_lock);
crawlerstats[CLEAR_LRU(i)].end_time = current_time;
crawlerstats[CLEAR_LRU(i)].run_complete = true;
pthread_mutex_unlock(&lru_crawler_stats_lock);

if (crawlers[i].type == CRAWLER_METADUMP && crawler_client.c != NULL) {
item_crawler_metadump_poll(&crawler_client);
item_crawler_close_client(&crawler_client);
}
}

static void *item_crawler_thread(void *arg) {
int i;
int crawls_persleep = settings.crawls_persleep;
Expand All @@ -1356,20 +1482,23 @@ static void *item_crawler_thread(void *arg) {
if (crawlers[i].it_flags != 1) {
continue;
}

/* Get memory from bipbuf, if client has no space, flush. */
// TODO: Will become a callback run here.
if (crawlers[i].type == CRAWLER_METADUMP) {
int ret = item_crawler_metadump_getbuf(&crawler_client);
if (ret != 0) {
item_crawler_class_done(i);
continue;
}
}
pthread_mutex_lock(&lru_locks[i]);
search = crawler_crawl_q((item *)&crawlers[i]);
if (search == NULL ||
(crawlers[i].remaining && --crawlers[i].remaining < 1)) {
if (settings.verbose > 2)
fprintf(stderr, "Nothing left to crawl for %d\n", i);
crawlers[i].it_flags = 0;
crawler_count--;
crawler_unlink_q((item *)&crawlers[i]);
pthread_mutex_unlock(&lru_locks[i]);
pthread_mutex_lock(&lru_crawler_stats_lock);
crawlerstats[CLEAR_LRU(i)].end_time = current_time;
crawlerstats[CLEAR_LRU(i)].run_complete = true;
pthread_mutex_unlock(&lru_crawler_stats_lock);
item_crawler_class_done(i);
continue;
}
uint32_t hv = hash(ITEM_key(search), search->nkey);
Expand All @@ -1392,13 +1521,17 @@ static void *item_crawler_thread(void *arg) {
/* Frees the item or decrements the refcount. */
/* Interface for this could improve: do the free/decr here
* instead? */
if (crawlers[i].type == CRAWLER_METADUMP)
pthread_mutex_unlock(&lru_locks[i]);

pthread_mutex_lock(&lru_crawler_stats_lock);
item_crawler_evaluate(search, hv, i);
crawlers[i].eval(search, hv, i);
pthread_mutex_unlock(&lru_crawler_stats_lock);

if (hold_lock)
item_trylock_unlock(hold_lock);
pthread_mutex_unlock(&lru_locks[i]);
if (crawlers[i].type != CRAWLER_METADUMP)
pthread_mutex_unlock(&lru_locks[i]);

if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) {
usleep(settings.lru_crawler_sleep);
Expand Down Expand Up @@ -1470,7 +1603,7 @@ int start_item_crawler_thread(void) {
/* 'remaining' is passed in so the LRU maintainer thread can scrub the whole
* LRU every time.
*/
static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
static int do_lru_crawler_start(uint32_t id, uint32_t remaining, enum crawler_run_type type) {
int i;
uint32_t sid;
uint32_t tocrawl[3];
Expand All @@ -1493,6 +1626,16 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
crawlers[sid].time = 0;
crawlers[sid].remaining = remaining;
crawlers[sid].slabs_clsid = sid;
crawlers[sid].type = type;
switch (type) {
case CRAWLER_METADUMP:
crawlers[sid].eval = item_crawler_metadump;
break;
case CRAWLER_EXPIRED:
default:
crawlers[sid].eval = item_crawler_evaluate;
break;
}
crawler_link_q((item *)&crawlers[sid]);
crawler_count++;
starts++;
Expand All @@ -1512,25 +1655,43 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
return starts;
}

static int lru_crawler_start(uint32_t id, uint32_t remaining) {
static int lru_crawler_start(uint32_t id, uint32_t remaining, const enum crawler_run_type type) {
int starts;
if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
return 0;
}
starts = do_lru_crawler_start(id, remaining);
starts = do_lru_crawler_start(id, remaining, type);
if (starts) {
pthread_cond_signal(&lru_crawler_cond);
}
pthread_mutex_unlock(&lru_crawler_lock);
return starts;
}

/* FIXME: Temporary hack since we can't yet pass this information into
* lru_crawler_crawl.. which has the proper locks/etc.
* Multiple parallel commands could race, but isn't part of the testing.
*/
int lru_crawler_set_client(void *c, const int sfd) {
if (crawler_client.c != NULL) {
return -1;
}
crawler_client.c = c;
crawler_client.sfd = sfd;

crawler_client.buf = bipbuf_new(1024 * 128);
if (crawler_client.buf == NULL) {
return -2;
}
return 0;
}

/* FIXME: Split this into two functions: one to kick a crawler for a sid, and one to
* parse the string. LRU maintainer code is generating a string to set up a
* sid.
* Also only clear the crawlerstats once per sid.
*/
enum crawler_result_type lru_crawler_crawl(char *slabs) {
enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_type type) {
char *b = NULL;
uint32_t sid = 0;
int starts = 0;
Expand Down Expand Up @@ -1561,7 +1722,7 @@ enum crawler_result_type lru_crawler_crawl(char *slabs) {

for (sid = POWER_SMALLEST; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
if (tocrawl[sid])
starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl);
starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl, type);
}
if (starts) {
pthread_cond_signal(&lru_crawler_cond);
Expand Down
3 changes: 2 additions & 1 deletion items.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ void lru_maintainer_resume(void);
int start_item_crawler_thread(void);
int stop_item_crawler_thread(void);
int init_lru_crawler(void);
enum crawler_result_type lru_crawler_crawl(char *slabs);
enum crawler_result_type lru_crawler_crawl(char *slabs, enum crawler_run_type);
int lru_crawler_set_client(void *c, const int sfd); /* FIXME: Temporary. */
void lru_crawler_pause(void);
void lru_crawler_resume(void);
33 changes: 32 additions & 1 deletion memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -3964,7 +3964,7 @@ static void process_command(conn *c, char *command) {
return;
}

rv = lru_crawler_crawl(tokens[2].value);
rv = lru_crawler_crawl(tokens[2].value, CRAWLER_EXPIRED);
switch(rv) {
case CRAWLER_OK:
out_string(c, "OK");
Expand All @@ -3980,6 +3980,33 @@ static void process_command(conn *c, char *command) {
break;
}
return;
} else if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "metadump") == 0) {
if (settings.lru_crawler == false) {
out_string(c, "CLIENT_ERROR lru crawler disabled");
return;
}

// FIXME: check response code.
lru_crawler_set_client(c, c->sfd);
int rv = lru_crawler_crawl(tokens[2].value, CRAWLER_METADUMP);
switch(rv) {
case CRAWLER_OK:
out_string(c, "OK");
// TODO: Don't reuse conn_watch here.
conn_set_state(c, conn_watch);
event_del(&c->event);
break;
case CRAWLER_RUNNING:
out_string(c, "BUSY currently processing crawler request");
break;
case CRAWLER_BADCLASS:
out_string(c, "BADCLASS invalid class id");
break;
case CRAWLER_NOTSTARTED:
out_string(c, "NOTSTARTED no items to crawl");
break;
}
return;
} else if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "tocrawl") == 0) {
uint32_t tocrawl;
if (!safe_strtoul(tokens[2].value, &tocrawl)) {
Expand Down Expand Up @@ -4015,6 +4042,7 @@ static void process_command(conn *c, char *command) {
} else {
out_string(c, "ERROR failed to stop lru crawler thread");
}
} else if ((strcmp(tokens[COMMAND_TOKEN + 1].value, "metadump") == 0)) {
} else {
out_string(c, "ERROR");
}
Expand Down Expand Up @@ -6372,6 +6400,9 @@ int main (int argc, char **argv) {
/* Drop privileges no longer needed */
drop_privileges();

/* Initialize the uriencode lookup table. */
uriencode_init();

/* enter the event loop */
if (event_base_loop(main_base, 0) != 0) {
retval = EXIT_FAILURE;
Expand Down
9 changes: 9 additions & 0 deletions memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,13 @@ typedef struct _stritem {
/* then data with terminating \r\n (no terminating null; it's binary!) */
} item;

typedef void (*crawler_eval_func)(item *it, uint32_t hv, int slab_cls);

// TODO: If we eventually want user loaded modules, we can't use an enum :(
enum crawler_run_type {
CRAWLER_EXPIRED=0, CRAWLER_METADUMP
};

typedef struct {
struct _stritem *next;
struct _stritem *prev;
Expand All @@ -432,6 +439,8 @@ typedef struct {
uint8_t slabs_clsid;/* which slab class we're in */
uint8_t nkey; /* key length, w/terminating null and padding */
uint32_t remaining; /* Max keys to crawl per slab per invocation */
enum crawler_run_type type; /* which module to use during run */
crawler_eval_func eval; /* The function to run with the locked item */
} crawler;

/* Header when an item is actually a chunk of another item. */
Expand Down
Loading

0 comments on commit a8347f6

Please sign in to comment.