Skip to content

Commit

Permalink
Add msg sending throttling between DCs.
Browse files Browse the repository at this point in the history
Use epoll's level-trigger instead of edge-trigger to make the throttling to work.
  • Loading branch information
timiblossom committed Jan 8, 2015
1 parent 8a77264 commit e968066
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 48 deletions.
14 changes: 14 additions & 0 deletions src/dyn_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ static struct command conf_commands[] = {
conf_set_string,
offsetof(struct conf_pool, env) },

{ string("conn_msg_rate"),
conf_set_num,
offsetof(struct conf_pool, conn_msg_rate)},

null_command
};

Expand Down Expand Up @@ -348,6 +352,8 @@ conf_pool_init(struct conf_pool *cp, struct string *name)

cp->gos_interval = CONF_UNSET_NUM;

cp->conn_msg_rate = CONF_UNSET_NUM;

array_null(&cp->server);
array_null(&cp->dyn_seeds);

Expand Down Expand Up @@ -499,6 +505,8 @@ conf_pool_each_transform(void *elem, void *data)
/* gossip */
sp->g_interval = cp->gos_interval;

set_tokens_earned_per_sec(cp->conn_msg_rate);

log_debug(LOG_VERB, "transform to pool %"PRIu32" '%.*s'", sp->idx,
sp->name.len, sp->name.data);

Expand Down Expand Up @@ -571,6 +579,8 @@ conf_dump(struct conf *cf)
log_debug(LOG_VVERB, " dyn_connections: %d", cp->dyn_connections);

log_debug(LOG_VVERB, " gos_interval: %d", cp->gos_interval);
log_debug(LOG_VVERB, " conn_msg_rate: %d", cp->conn_msg_rate);

log_debug(LOG_VVERB, " secure_server_option: \"%.*s\"",
cp->secure_server_option.len,
cp->secure_server_option.data);
Expand Down Expand Up @@ -1500,6 +1510,10 @@ conf_validate_pool(struct conf *cf, struct conf_pool *cp)
cp->gos_interval = CONF_DEFAULT_GOS_INTERVAL;
}

if (cp->conn_msg_rate == CONF_UNSET_NUM) {
cp->conn_msg_rate = CONF_DEFAULT_CONN_MSG_RATE;
}

if (string_empty(&cp->rack)) {
string_copy_c(&cp->rack, &CONF_DEFAULT_RACK);
log_debug(LOG_INFO, "setting rack to default value:%s", CONF_DEFAULT_RACK);
Expand Down
4 changes: 4 additions & 0 deletions src/dyn_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@
#define CONF_DEFAULT_GOS_INTERVAL 30000 //5000 //in millisec
#define CONF_DEFAULT_PEERS 200

#define CONF_DEFAULT_CONN_MSG_RATE 100 //conn msgs per sec
#define CONF_DEFAULT_CONN_MAX_AVAL_TOKENS 1000

#define CONF_STR_NONE "none"
#define CONF_STR_DC "datacenter"
#define CONF_STR_RACK "rack"
Expand Down Expand Up @@ -139,6 +142,7 @@ struct conf_pool {
struct string pem_key_file;
struct string dc; /* this node's dc */
struct string env; /* aws, google, network, ... */
int conn_msg_rate; /* conn msg per sec */
};


Expand Down
4 changes: 4 additions & 0 deletions src/dyn_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ _conn_get(void)
conn->dnode_secured = 0;
conn->dnode_crypto_state = 0;

conn->same_dc = 1;
conn->avail_tokens = max_allowable_rate();
conn->last_sent = 0;

unsigned char *ase_key = generate_aes_key();
strncpy(conn->aes_key, ase_key, strlen(ase_key)); //generate a new key for each connection

Expand Down
8 changes: 6 additions & 2 deletions src/dyn_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,16 @@ struct conn {
unsigned eof:1; /* eof? aka passive close? */
unsigned done:1; /* done? aka close? */
unsigned redis:1; /* redis? */
unsigned dnode_server:1; /* dndoe server connection? */
unsigned dnode_server:1; /* dnode server connection? */
unsigned dnode_client:1; /* dnode client? */
unsigned dyn_mode:1; /* is a dyn connection? */
unsigned dnode_secured:1; /* is a secured connection? */
unsigned dnode_crypto_state:1; /* crypto state */
unsigned char aes_key[34]; /* a place holder for AES key */
unsigned char aes_key[50]; //aes_key[34]; /* a place holder for AES key */

unsigned same_dc:1; /* bit to indicate whether a peer conn is same DC */
uint32_t avail_tokens; /* used to throttle the traffics */
uint32_t last_sent; /* ts in sec used to determine the last sent time */
};

TAILQ_HEAD(conn_tqh, conn);
Expand Down
30 changes: 28 additions & 2 deletions src/dyn_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -395,13 +395,41 @@ core_timeout(struct context *ctx)
}
}


rstatus_t
core_core(void *arg, uint32_t events)
{
rstatus_t status;
struct conn *conn = arg;
struct context *ctx = conn_to_ctx(conn);


/*
if (!conn->dyn_mode) {
if (conn->client && !conn->proxy) {
struct server_pool *sp = conn->owner;
log_debug(LOG_VERB, "Client : '%.*s'", sp->name);
} else if (!conn->client && !conn->proxy) {
struct server *server = conn->owner;
log_debug(LOG_VERB, "Storage server : '%.*s'", server->name);
} else {
struct server_pool *sp = conn->owner;
log_debug(LOG_VERB, "Proxy : '%.*s'", sp->name);
}
} else {
if (conn->dnode_client && !conn->dnode_server) {
struct server_pool *sp = conn->owner;
log_debug(LOG_VERB, "Dnode client : '%.*s'", sp->name);
} else if (!conn->dnode_client && !conn->dnode_server) {
struct server *server = conn->owner;
log_debug(LOG_VERB, "Dnode peer : '%.*s'", server->name);
} else {
struct server_pool *sp = conn->owner;
log_debug(LOG_VERB, "Dnode server : '%.*s'", sp->name);
}
}
*/

if (conn->dyn_mode) {
log_debug(LOG_VVERB, "event %04"PRIX32" on d_%c %d", events,
conn->dnode_client ? 'c' : (conn->dnode_server ? 's' : 'p'), conn->sd);
Expand All @@ -427,7 +455,6 @@ core_core(void *arg, uint32_t events)
}
}


if (events & EVENT_WRITE) {
status = core_send(ctx, conn);
if (status != DN_OK || conn->done || conn->err) {
Expand All @@ -436,7 +463,6 @@ core_core(void *arg, uint32_t events)
}
}


return DN_OK;
}

Expand Down
36 changes: 12 additions & 24 deletions src/dyn_dnode_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,15 @@ dyn_parse_core(struct msg *r)
return false;
}

log_debug(LOG_DEBUG, "in split");
log_debug(LOG_VVERB, "in split");
r->dyn_state = DYN_START;
r->pos = token;
r->result = MSG_PARSE_REPAIR;
log_hexdump(LOG_DEBUG, b->pos, mbuf_length(b), "split and inspecting req %"PRIu64" "
log_hexdump(LOG_VVERB, b->pos, mbuf_length(b), "split and inspecting req %"PRIu64" "
"res %d type %d state %d", r->id, r->result, r->type,
r->dyn_state);

log_hexdump(LOG_VVERB, b->start, b->last - b->start, "split and inspecting full req %"PRIu64" "
"res %d type %d state %d", r->id, r->result, r->type,
r->dyn_state);
return false;
Expand All @@ -328,11 +332,11 @@ dyn_parse_core(struct msg *r)
r->pos = p;
dmsg->source_address = r->owner->addr;

log_debug(LOG_DEBUG, "at done with p at %d", p);
log_hexdump(LOG_DEBUG, r->pos, b->last - r->pos, "done and inspecting req %"PRIu64" "
log_debug(LOG_VVERB, "at done with p at %d", p);
log_hexdump(LOG_VVERB, r->pos, b->last - r->pos, "done and inspecting req %"PRIu64" "
"res %d type %d state %d", r->id, r->result, r->type,
r->dyn_state);
log_hexdump(LOG_DEBUG, b->start, b->last - b->start, "inspecting req %"PRIu64" "
log_hexdump(LOG_VVERB, b->start, b->last - b->start, "inspecting req %"PRIu64" "
"res %d type %d state %d", r->id, r->result, r->type,
r->dyn_state);

Expand Down Expand Up @@ -425,16 +429,9 @@ dyn_parse_req(struct msg *r)
return;

if (r->redis) {
int8_t *pos = r->pos;
redis_parse_req(r);
if (r->result == MSG_PARSE_ERROR) {
r->state = 0;
r->dyn_state = 0;
r->pos = pos;
return dyn_parse_req(r);
}
return;
return redis_parse_req(r);
}

return memcache_parse_req(r);
}

Expand Down Expand Up @@ -485,19 +482,10 @@ void dyn_parse_rsp(struct msg *r)
mbuf_remove(&r->mhdr, b);
mbuf_put(b);
r->mlen = mbuf_length(decrypted_buf);

}

if (r->redis) {
int8_t *pos = r->pos;
redis_parse_rsp(r);
if (r->result == MSG_PARSE_ERROR) {
r->state = 0;
r->dyn_state = 0;
r->pos = pos;
return redis_parse_rsp(r);
}
return;
return redis_parse_rsp(r);
}

return memcache_parse_rsp(r);
Expand Down
21 changes: 15 additions & 6 deletions src/dyn_dnode_peer.c
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,12 @@ static bool is_conn_secured(struct server_pool *sp, struct server *peer_node)
}


static bool is_same_dc(struct server_pool *sp, struct server *peer_node)
{
return string_compare(&sp->dc, &peer_node->dc) == 0;
}


struct conn *
dnode_peer_conn(struct server *server)
{
Expand All @@ -316,12 +322,15 @@ dnode_peer_conn(struct server *server)

//if (server->ns_conn_q < pool->peer_connections) {
if (server->ns_conn_q < 1) {
conn = conn_get_peer(server, false, pool->redis);
if (is_conn_secured(pool, server)) {
conn->dnode_secured = 1;
conn->dnode_crypto_state = 0; //need to do a encryption handshake
}
return conn;
conn = conn_get_peer(server, false, pool->redis);
if (is_conn_secured(pool, server)) {
conn->dnode_secured = 1;
conn->dnode_crypto_state = 0; //need to do a encryption handshake
}

conn->same_dc = is_same_dc(pool, server)? 1 : 0;

return conn;
}

//ASSERT(server->ns_conn_q == pool->peer_connections);
Expand Down
38 changes: 30 additions & 8 deletions src/dyn_dnode_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,37 @@ dnode_req_recv_done(struct context *ctx, struct conn *conn,
dnode_req_local_forward(ctx, conn, msg);
}


struct msg *
dnode_req_send_next(struct context *ctx, struct conn *conn)
{
rstatus_t status;
struct msg *msg, *nmsg; /* current and next message */

ASSERT(!conn->dnode_client && !conn->dnode_server);

//throttling the sending traffics here
if (!conn->same_dc) {
uint32_t now = time(NULL);
if (conn->last_sent != 0) {
uint32_t elapsed_time = now - conn->last_sent;
uint32_t earned_tokens = elapsed_time * tokens_earned_per_sec();
conn->avail_tokens = (conn->avail_tokens + earned_tokens) < max_allowable_rate()?
conn->avail_tokens + earned_tokens : max_allowable_rate();

}

conn->last_sent = now;
if (conn->avail_tokens > 0) {
conn->avail_tokens--;
return req_send_next(ctx, conn);
}

//requeue
status = event_add_out(ctx->evb, conn);

return NULL;
}

return req_send_next(ctx, conn);
}

Expand Down Expand Up @@ -301,13 +324,12 @@ dnode_peer_req_forward(struct context *ctx, struct conn *c_conn, struct conn *p_
ASSERT(c_conn->client);

/* enqueue the message (request) into peer inq */
if (TAILQ_EMPTY(&p_conn->imsg_q)) {
status = event_add_out(ctx->evb, p_conn);
if (status != DN_OK) {
dnode_req_forward_error(ctx, p_conn, msg);
p_conn->err = errno;
return;
}
//if (TAILQ_EMPTY(&p_conn->imsg_q)) {
status = event_add_out(ctx->evb, p_conn);
if (status != DN_OK) {
dnode_req_forward_error(ctx, p_conn, msg);
p_conn->err = errno;
return;
}

uint64_t msg_id = peer_msg_id++;
Expand Down
3 changes: 0 additions & 3 deletions src/dyn_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -821,9 +821,6 @@ msg_send_chain(struct context *ctx, struct conn *conn, struct msg *msg)
continue;
}

//log_hexdump(LOG_NOTICE, mbuf->pos, mbuf_length(mbuf),
// "Print out mbuf being sent %d", msg->dmsg == NULL);

mlen = mbuf_length(mbuf);
if ((nsend + mlen) > limit) {
mlen = limit - nsend;
Expand Down
25 changes: 25 additions & 0 deletions src/dyn_setting.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@


#include "dyn_core.h"
#include "dyn_conf.h"

static uint8_t tracking_level = 0;
static uint32_t max_tokens = CONF_DEFAULT_CONN_MAX_AVAL_TOKENS; //max msgs can be seen through a conn
static uint32_t conn_msg_rate = CONF_DEFAULT_CONN_MSG_RATE; //conn msgs per sec


uint8_t get_tracking_level(void)
{
Expand All @@ -29,3 +33,24 @@ void set_tracking_level(uint8_t level)
tracking_level = level;
}

uint32_t max_allowable_rate(void)
{
return max_tokens;
}


void set_max_allowable_rate(uint32_t rate)
{
max_tokens = rate;
}


uint32_t tokens_earned_per_sec(void)
{
return conn_msg_rate;
}

void set_tokens_earned_per_sec(uint32_t tokens_per_sec)
{
conn_msg_rate = tokens_per_sec;
}
5 changes: 5 additions & 0 deletions src/dyn_setting.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
uint8_t get_tracking_level(void);
void set_tracking_level(uint8_t level);

uint32_t max_allowable_rate(void);
void set_max_allowable_rate(uint32_t rate);

uint32_t tokens_earned_per_sec(void);
void set_tokens_earned_per_sec(uint32_t tokens_per_sec);


#endif
Loading

0 comments on commit e968066

Please sign in to comment.