Skip to content

Commit

Permalink
Merge pull request Netflix#150 from Netflix/redis_update_branch
Browse files Browse the repository at this point in the history
Added some provisions for error messages and some messaging functions
  • Loading branch information
shailesh33 committed Aug 11, 2015
2 parents 781688c + 6ae2055 commit 3dee268
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 5 deletions.
103 changes: 103 additions & 0 deletions src/dyn_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ msg_get(struct conn *conn, bool request, int data_store)
}
msg->pre_splitcopy = redis_pre_splitcopy;
msg->post_splitcopy = redis_post_splitcopy;
msg->reply = redis_reply;
msg->failure = redis_failure;
msg->pre_coalesce = redis_pre_coalesce;
msg->post_coalesce = redis_post_coalesce;
} else if (data_store == DATA_MEMCACHE) {
Expand Down Expand Up @@ -1124,3 +1126,104 @@ msg_send(struct context *ctx, struct conn *conn)

return DN_OK;
}

struct mbuf *
msg_ensure_mbuf(struct msg *msg, size_t len)
{
struct mbuf *mbuf;

if (STAILQ_EMPTY(&msg->mhdr) ||
mbuf_size(STAILQ_LAST(&msg->mhdr, mbuf, next)) < len) {
mbuf = mbuf_get();
if (mbuf == NULL) {
return NULL;
}
mbuf_insert(&msg->mhdr, mbuf);
} else {
mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next);
}

return mbuf;
}


/*
* Append n bytes of data, with n <= mbuf_size(mbuf)
* into mbuf
*/
rstatus_t
msg_append(struct msg *msg, uint8_t *pos, size_t n)
{
struct mbuf *mbuf;

ASSERT(n <= mbuf_data_size());

mbuf = msg_ensure_mbuf(msg, n);
if (mbuf == NULL) {
return DN_ENOMEM;
}

ASSERT(n <= mbuf_size(mbuf));

mbuf_copy(mbuf, pos, n);
msg->mlen += (uint32_t)n;

return DN_OK;
}

/*
* Prepend n bytes of data, with n <= mbuf_size(mbuf)
* into mbuf
*/
rstatus_t
msg_prepend(struct msg *msg, uint8_t *pos, size_t n)
{
struct mbuf *mbuf;

mbuf = mbuf_get();
if (mbuf == NULL) {
return DN_ENOMEM;
}

ASSERT(n <= mbuf_size(mbuf));

mbuf_copy(mbuf, pos, n);
msg->mlen += (uint32_t)n;

STAILQ_INSERT_HEAD(&msg->mhdr, mbuf, next);

return DN_OK;
}

/*
* Prepend a formatted string into msg. Returns an error if the formatted
* string does not fit in a single mbuf.
*/
rstatus_t
msg_prepend_format(struct msg *msg, const char *fmt, ...)
{
struct mbuf *mbuf;
int n;
uint32_t size;
va_list args;

mbuf = mbuf_get();
if (mbuf == NULL) {
return DN_ENOMEM;
}

size = mbuf_size(mbuf);

va_start(args, fmt);
n = dn_vscnprintf(mbuf->last, size, fmt, args);
va_end(args);
if (n <= 0 || n >= (int)size) {
return DN_ERROR;
}

mbuf->last += n;
msg->mlen += (uint32_t)n;
STAILQ_INSERT_HEAD(&msg->mhdr, mbuf, next);

return DN_OK;
}
8 changes: 8 additions & 0 deletions src/dyn_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
typedef void (*msg_parse_t)(struct msg *);
typedef rstatus_t (*msg_post_splitcopy_t)(struct msg *);
typedef void (*msg_coalesce_t)(struct msg *r);
typedef rstatus_t (*msg_reply_t)(struct msg *r);
typedef bool (*msg_failure_t)(struct msg *r);

typedef enum msg_parse_result {
MSG_PARSE_OK, /* parsing ok */
Expand Down Expand Up @@ -230,6 +232,8 @@ struct msg {
msg_coalesce_t post_coalesce; /* message post-coalesce */

msg_type_t type; /* message type */
msg_reply_t reply; /* generate message reply (example: ping) */
msg_failure_t failure; /* transient failure response? */

uint8_t *key_start; /* key start */
uint8_t *key_end; /* key end */
Expand Down Expand Up @@ -297,6 +301,10 @@ bool msg_empty(struct msg *msg);
rstatus_t msg_recv(struct context *ctx, struct conn *conn);
rstatus_t msg_send(struct context *ctx, struct conn *conn);
uint32_t msg_alloc_msgs(void);
struct mbuf *msg_ensure_mbuf(struct msg *msg, size_t len);
rstatus_t msg_append(struct msg *msg, uint8_t *pos, size_t n);
rstatus_t msg_prepend(struct msg *msg, uint8_t *pos, size_t n);
rstatus_t msg_prepend_format(struct msg *msg, const char *fmt, ...);

struct msg *req_get(struct conn *conn);
void req_put(struct msg *msg);
Expand Down
3 changes: 3 additions & 0 deletions src/proto/dyn_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ void memcache_post_coalesce(struct msg *r);

void redis_parse_req(struct msg *r);
void redis_parse_rsp(struct msg *r);
bool redis_failure(struct msg *r);
void redis_pre_splitcopy(struct mbuf *mbuf, void *arg);
rstatus_t redis_post_splitcopy(struct msg *r);
rstatus_t redis_reply(struct msg *r);
void redis_pre_coalesce(struct msg *r);
void redis_post_coalesce(struct msg *r);
void redis_swallow_msg(struct conn *conn, struct msg *pmsg, struct msg *msg);

#endif
180 changes: 175 additions & 5 deletions src/proto/dyn_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
#include "../dyn_core.h"
#include "dyn_proto.h"

#define RSP_STRING(ACTION) \
ACTION( pong, "+PONG\r\n" ) \

#define DEFINE_ACTION(_var, _str) static struct string rsp_##_var = string(_str);
RSP_STRING( DEFINE_ACTION )
#undef DEFINE_ACTION

/*
* Return true, if the redis command take no key, otherwise
* return false
Expand Down Expand Up @@ -448,14 +455,14 @@ redis_parse_req(struct msg *r)
break;

case SW_INLINE_PING:
if (str3icmp(p, 'i', 'n', 'g')) {
if (str3icmp(p, 'i', 'n', 'g') && p + 4 < b->last) {
p = p + 4;
log_hexdump(LOG_VERB, b->pos, mbuf_length(b),"PING");
r->type = MSG_REQ_REDIS_PING;
p = p + 4;
r->is_read = 1;
state = SW_REQ_TYPE_LF;
goto done;
}
}
else{
log_hexdump(LOG_VERB, b->pos, mbuf_length(b),"PING ERROR %d, %s",p-m,p);
goto error;
Expand Down Expand Up @@ -1901,8 +1908,124 @@ redis_parse_rsp(struct msg *r)
break;

case SW_ERROR:
/* rsp_start <- p */
state = SW_RUNTO_CRLF;
if (r->token == NULL) {
if (ch != '-') {
goto error;
}
/* rsp_start <- p */
r->token = p;
}
if (ch == ' ' || ch == CR) {
m = r->token;
r->token = NULL;
switch (p - m) {

case 4:
/*
* -ERR no such key\r\n
* -ERR syntax error\r\n
* -ERR source and destination objects are the same\r\n
* -ERR index out of range\r\n
*/
if (str4cmp(m, '-', 'E', 'R', 'R')) {
r->type = MSG_RSP_REDIS_ERROR_ERR;
break;
}

/* -OOM command not allowed when used memory > 'maxmemory'.\r\n */
if (str4cmp(m, '-', 'O', 'O', 'M')) {
r->type = MSG_RSP_REDIS_ERROR_OOM;
break;
}

break;

case 5:
/* -BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n" */
if (str5cmp(m, '-', 'B', 'U', 'S', 'Y')) {
r->type = MSG_RSP_REDIS_ERROR_BUSY;
break;
}

break;

case 7:
/* -NOAUTH Authentication required.\r\n */
if (str7cmp(m, '-', 'N', 'O', 'A', 'U', 'T', 'H')) {
r->type = MSG_RSP_REDIS_ERROR_NOAUTH;
break;
}

break;

case 8:
/* rsp: "-LOADING Redis is loading the dataset in memory\r\n" */
if (str8cmp(m, '-', 'L', 'O', 'A', 'D', 'I', 'N', 'G')) {
r->type = MSG_RSP_REDIS_ERROR_LOADING;
break;
}

/* -BUSYKEY Target key name already exists.\r\n */
if (str8cmp(m, '-', 'B', 'U', 'S', 'Y', 'K', 'E', 'Y')) {
r->type = MSG_RSP_REDIS_ERROR_BUSYKEY;
break;
}

/* "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.\r\n" */
if (str8cmp(m, '-', 'M', 'I', 'S', 'C', 'O', 'N', 'F')) {
r->type = MSG_RSP_REDIS_ERROR_MISCONF;
break;
}

break;

case 9:
/* -NOSCRIPT No matching script. Please use EVAL.\r\n */
if (str9cmp(m, '-', 'N', 'O', 'S', 'C', 'R', 'I', 'P', 'T')) {
r->type = MSG_RSP_REDIS_ERROR_NOSCRIPT;
break;
}

/* -READONLY You can't write against a read only slave.\r\n */
if (str9cmp(m, '-', 'R', 'E', 'A', 'D', 'O', 'N', 'L', 'Y')) {
r->type = MSG_RSP_REDIS_ERROR_READONLY;
break;
}

break;

case 10:
/* -WRONGTYPE Operation against a key holding the wrong kind of value\r\n */
if (str10cmp(m, '-', 'W', 'R', 'O', 'N', 'G', 'T', 'Y', 'P', 'E')) {
r->type = MSG_RSP_REDIS_ERROR_WRONGTYPE;
break;
}

/* -EXECABORT Transaction discarded because of previous errors.\r\n" */
if (str10cmp(m, '-', 'E', 'X', 'E', 'C', 'A', 'B', 'O', 'R', 'T')) {
r->type = MSG_RSP_REDIS_ERROR_EXECABORT;
break;
}

break;

case 11:
/* -MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.\r\n */
if (str11cmp(m, '-', 'M', 'A', 'S', 'T', 'E', 'R', 'D', 'O', 'W', 'N')) {
r->type = MSG_RSP_REDIS_ERROR_MASTERDOWN;
break;
}

/* -NOREPLICAS Not enough good slaves to write.\r\n */
if (str11cmp(m, '-', 'N', 'O', 'R', 'E', 'P', 'L', 'I', 'C', 'A', 'S')) {
r->type = MSG_RSP_REDIS_ERROR_NOREPLICAS;
break;
}

break;
}
state = SW_RUNTO_CRLF;
}
break;

case SW_INTEGER:
Expand Down Expand Up @@ -2230,6 +2353,53 @@ redis_parse_rsp(struct msg *r)

}

/*
* Return true, if redis replies with a transient server failure response,
* otherwise return false
*
* Transient failures on redis are scenarios when it is temporarily
* unresponsive and responds with the following protocol specific error
* reply:
* -OOM, when redis is out-of-memory
* -BUSY, when redis is busy
* -LOADING when redis is loading dataset into memory
*
*/
bool
redis_failure(struct msg *r)
{
ASSERT(!r->request);

switch (r->type) {
case MSG_RSP_REDIS_ERROR_OOM:
case MSG_RSP_REDIS_ERROR_BUSY:
case MSG_RSP_REDIS_ERROR_LOADING:
return true;

default:
break;
}

return false;
}

rstatus_t
redis_reply(struct msg *r)
{
struct msg *response = r->peer;

ASSERT(response != NULL && response->owner != NULL);

switch (r->type) {
case MSG_REQ_REDIS_PING:
return msg_append(response, rsp_pong.data, rsp_pong.len);

default:
NOT_REACHED();
return DN_ERROR;
}
}

/*
* Pre-split copy handler invoked when the request is a multi vector -
* 'mget' or 'del' request and is about to be split into two requests
Expand Down

0 comments on commit 3dee268

Please sign in to comment.