Skip to content

Commit

Permalink
Merge pull request Netflix#735 from Netflix/dev
Browse files Browse the repository at this point in the history
Merge dev into v0.6
  • Loading branch information
smukil authored Oct 30, 2019
2 parents 21280ab + 63e0b4f commit dfd5190
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 7 deletions.
14 changes: 13 additions & 1 deletion src/dyn_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,19 @@ static rstatus_t swallow_extra_rsp(struct msg *req, struct msg *rsp) {

static rstatus_t msg_quorum_rsp_handler(struct context *ctx, struct msg *req,
struct msg *rsp) {
if (req->rspmgr.done) return swallow_extra_rsp(req, rsp);
if (req->rspmgr.done) {
rstatus_t swallow_status = swallow_extra_rsp(req, rsp);
if (is_read_repairs_enabled()) {
struct msg *cleanup_msg = NULL;
// Check if we can delete tombstone metadata.
rstatus_t status = g_clear_repair_md_for_key(ctx, req, &cleanup_msg);
if (status == DN_OK) {
req_forward(ctx, req->owner, cleanup_msg);
}
return DN_NOOPS;
}
return swallow_status;
}
rspmgr_submit_response(&req->rspmgr, rsp);
if (!rspmgr_check_is_done(&req->rspmgr)) return DN_EAGAIN;
// rsp is absorbed by rspmgr. so we can use that variable
Expand Down
1 change: 1 addition & 0 deletions src/dyn_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ static rstatus_t core_dnode_peer_pool_preconnect(struct context *ctx) {
IGNORE_RET_VAL(status);
return status;
}

static rstatus_t core_dnode_peer_init(struct context *ctx) {
/* initialize peers */
THROW_STATUS(dnode_initialize_peers(ctx));
Expand Down
15 changes: 12 additions & 3 deletions src/dyn_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func_msg_rewrite_t g_rewrite_query; /* rewrite query in a msg if necessary *
/* rewrite query as script that updates both data and metadata */
func_msg_rewrite_t g_rewrite_query_with_timestamp_md;
func_msg_repair_t g_make_repair_query; /* Send a repair msg. */
func_clear_repair_md_t g_clear_repair_md_for_key; /* Clear repair metadata for a key */

#define DEFINE_ACTION(_name) string(#_name),
static struct string msg_type_strings[] = {MSG_TYPE_CODEC(DEFINE_ACTION)
Expand Down Expand Up @@ -201,6 +202,7 @@ void set_datastore_ops(void) {
g_rewrite_query = redis_rewrite_query;
g_rewrite_query_with_timestamp_md = redis_rewrite_query_with_timestamp_md;
g_make_repair_query = redis_make_repair_query;
g_clear_repair_md_for_key = redis_clear_repair_md_for_key;
break;
case DATA_MEMCACHE:
g_pre_coalesce = memcache_pre_coalesce;
Expand All @@ -212,6 +214,7 @@ void set_datastore_ops(void) {
g_rewrite_query = memcache_rewrite_query;
g_rewrite_query_with_timestamp_md = memcache_rewrite_query_with_timestamp_md;
g_make_repair_query = memcache_make_repair_query;
g_clear_repair_md_for_key = memcache_clear_repair_md_for_key;
break;
default:
return;
Expand Down Expand Up @@ -606,7 +609,7 @@ void msg_put(struct msg *msg) {
return;
}

if (msg->is_request && msg->awaiting_rsps != 0) {
if (msg->is_request && msg->awaiting_rsps != 0 && msg->expect_datastore_reply !=0) {
log_error("Not freeing req %d, awaiting_rsps = %u", msg->id,
msg->awaiting_rsps);
return;
Expand Down Expand Up @@ -1120,7 +1123,13 @@ static rstatus_t msg_recv_chain(struct context *ctx, struct conn *conn,
msize = (size_t)MIN(msg->dmsg->plen, mbuf->end_extra - mbuf->last);
}

n = conn_recv_data(conn, mbuf->last, msize);
if (msize != 0) {
n = conn_recv_data(conn, mbuf->last, msize);
} else {
// We may have got an event notification even though we received all the data.
// In that case, we don't want to read off the socket again.
n = 0;
}

if (n < 0) {
if (n == DN_EAGAIN) {
Expand All @@ -1135,7 +1144,7 @@ static rstatus_t msg_recv_chain(struct context *ctx, struct conn *conn,

// Only used in encryption case
if (encryption_detected) {
if (n >= msg->dmsg->plen || mbuf->end_extra == mbuf->last) {
if ((n >= msg->dmsg->plen && n != 0) || mbuf->end_extra == mbuf->last) {
// log_debug(LOG_VERB, "About to decrypt this mbuf as it is full or
// eligible!");
struct mbuf *nbuf = NULL;
Expand Down
3 changes: 3 additions & 0 deletions src/dyn_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ typedef rstatus_t (*func_msg_rewrite_t)(struct msg *orig_msg,
struct msg **new_msg_ptr);
typedef rstatus_t (*func_msg_repair_t)(struct context *ctx, struct response_mgr *rspmgr,
struct msg **new_msg_ptr);
typedef rstatus_t (*func_clear_repair_md_t)(struct context *ctx, struct msg *req,
struct msg **new_msg_ptr);
typedef void (*func_init_datastore_t)();

extern func_msg_coalesce_t g_pre_coalesce; /* message pre-coalesce */
Expand All @@ -257,6 +259,7 @@ extern func_msg_rewrite_t
extern func_msg_rewrite_t
g_rewrite_query_with_timestamp_md;
extern func_msg_repair_t g_make_repair_query; /* Create a repair msg. */
extern func_clear_repair_md_t g_clear_repair_md_for_key;

void set_datastore_ops(void);

Expand Down
6 changes: 5 additions & 1 deletion src/dyn_task.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
#include "dyn_task.h"
/*
* Dynomite - A thin, distributed replication layer for multi non-distributed
* storages. Copyright (C) 2019 Netflix, Inc.
*/

#include <stdbool.h>

#include "dyn_task.h"
#include "dyn_util.h"

/**
Expand Down
5 changes: 5 additions & 0 deletions src/proto/dyn_memcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -1627,3 +1627,8 @@ rstatus_t memcache_make_repair_query(struct context *ctx, struct response_mgr *r
struct msg **new_msg_ptr) {
return DN_OK;
}

rstatus_t memcache_clear_repair_md_for_key(struct context *ctx, struct msg *req,
struct msg **new_msg_ptr) {
return DN_OK;
}
6 changes: 5 additions & 1 deletion src/proto/dyn_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ rstatus_t memcache_rewrite_query_with_timestamp_md(struct msg *orig_msg,
struct context *ctx, bool *did_rewrite, struct msg **new_msg_ptr);
rstatus_t memcache_make_repair_query(struct context *ctx, struct response_mgr *rspmgr,
struct msg **new_msg_ptr);

rstatus_t memcache_clear_repair_md_for_key(struct context *ctx, struct msg *req,
struct msg **new_msg_ptr);

void redis_parse_req(struct msg *r, struct context *ctx);
void redis_parse_rsp(struct msg *r, struct context *ctx);
Expand All @@ -71,4 +72,7 @@ rstatus_t redis_rewrite_query_with_timestamp_md(struct msg *orig_msg,
struct context *ctx, bool *did_rewrite, struct msg **new_msg_ptr);
rstatus_t redis_make_repair_query(struct context *ctx, struct response_mgr *rspmgr,
struct msg **new_msg_ptr);
rstatus_t redis_clear_repair_md_for_key(struct context *ctx, struct msg *req,
struct msg **new_msg_ptr);

#endif
47 changes: 47 additions & 0 deletions src/proto/dyn_proto_repair.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,53 @@
"end\n\n"\
"return ret\n\r\n"


/**********************************************************************/
/* BEGIN METADATA CLEANUP SCRIPTS */
/**********************************************************************/

// Note: Some of the fields are not necessary but are still kept because the Dynomite
// code creates all scripts with a certain argument format.
// Eg: 'orig_cmd', 'num_fields', etc. are unnecessary for these scripts.
#define CLEANUP_DEL_SCRIPT "$4\r\nEVAL\r\n$415\r\n"\
"local key = KEYS[1]\n"\
"local top_level_add_set = KEYS[2]\n"\
"local top_level_rem_set = KEYS[3]\n"\
"local orig_cmd = ARGV[1]\n"\
"local num_fields = ARGV[2]\n"\
"local cur_ts = ARGV[3]\n\n"\
"local top_level_rem_set_ts = redis.call('ZSCORE', top_level_rem_set, key)\n"\
"if (top_level_rem_set_ts) then\n"\
" if (tonumber(cur_ts) < tonumber(top_level_rem_set_ts)) then\n"\
" return 0\n"\
" end\n"\
" return redis.call('ZREM', top_level_rem_set, key)\n"\
"end\n"\
"return 0\n\r\n"

#define CLEANUP_HDEL_SCRIPT "$4\r\nEVAL\r\n$664\r\n"\
"local key = KEYS[1]\n"\
"local top_level_add_set = KEYS[2]\n"\
"local top_level_rem_set = KEYS[3]\n"\
"local add_set = top_level_add_set .. '_' .. key\n"\
"local rem_set = top_level_rem_set .. '_' .. key\n"\
"local orig_cmd = ARGV[1]\n"\
"local num_fields = ARGV[2]\n"\
"local cur_ts = ARGV[3]\n"\
"local field = ARGV[4]\n\n"\
"local last_seen_ts_in_rem = redis.call('ZSCORE', rem_set, field)\n"\
"if (last_seen_ts_in_rem) then\n"\
" if (tonumber(cur_ts) < tonumber(last_seen_ts_in_rem)) then\n"\
" return 0\n"\
" end\n"\
" local ret = redis.call('ZREM', rem_set, field)\n"\
" local remaining_elems = redis.call('ZCARD', rem_set)\n"\
" if (remaining_elems == 0) then\n"\
" redis.call('ZREM', top_level_rem_set, key)\n"\
" end\n"\
" return ret\n"\
"end\n\r\n"

#define MAX_ARG_FMT_STR_LEN 512

#define ADD_SET_STR "._add-set"
Expand Down
95 changes: 94 additions & 1 deletion src/proto/dyn_redis_repair.c
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ void update_total_num_tokens(struct write_with_ts *src) {
* <total_num_tokens> <script> <args>
*
* where <args> can be elaborated more into:
* <key1>..(<keyN>) <+set> <-set> <orig_cmd> <num_opts> <num_flds> \
* <key1>..(<keyN>) <+set> <-set> <orig_cmd> (<num_opts>) <num_flds> \
* <ts> (<opt1>) .. (<optN>) (<fld1>) (<val1>) (<fldN>) ..
*
* Tokens shown above with parantheses are optional.
Expand Down Expand Up @@ -940,3 +940,96 @@ rstatus_t redis_rewrite_query_with_timestamp_md(struct msg *orig_msg, struct con
if (new_msg != NULL) msg_put(new_msg);
return ret_status;
}

// TODO: Do code cleanup
static rstatus_t create_cleanup_script(struct context *ctx, struct msg *orig_msg,
struct conn *conn, struct msg **new_msg_ptr) {
rstatus_t ret_status;

struct write_with_ts msg_info;
msg_info.keys = NULL;
msg_info.fields = NULL;
msg_info.num_keys = 1;
msg_info.num_values = 0;
msg_info.num_optionals = 0;

msg_info.keys = array_create(msg_info.num_keys, sizeof(struct keypos));
if (msg_info.keys == NULL) goto error;

struct keypos *kpos = (struct keypos*)array_push(msg_info.keys);
struct keypos *orig_kpos = (struct keypos*)array_get(orig_msg->keys, 0);
kpos->start = orig_kpos->start;
kpos->end = orig_kpos->end;
kpos->tag_start = orig_kpos->tag_start;
kpos->tag_end = orig_kpos->tag_end;

msg_info.ts = orig_msg->timestamp;
msg_info.add_set = ADD_SET_STR;
msg_info.rem_set = REM_SET_STR;

msg_type_t orig_msg_type = orig_msg->type;
switch (orig_msg_type) {
case MSG_REQ_REDIS_DEL:
msg_info.rewrite_script = CLEANUP_DEL_SCRIPT;
msg_info.num_fields = 0;
break;
case MSG_REQ_REDIS_ZREM:
case MSG_REQ_REDIS_HDEL:
case MSG_REQ_REDIS_SREM:
msg_info.rewrite_script = CLEANUP_HDEL_SCRIPT;
msg_info.num_fields = 1;
msg_info.fields = array_create(msg_info.num_fields, sizeof(struct argpos));
if (msg_info.fields == NULL) goto error;
struct argpos *field_pos = (struct argpos*)array_push(msg_info.fields);
struct argpos *orig_field_pos = (struct argpos*)array_get(orig_msg->args, 0);
field_pos->start = orig_field_pos->start;
field_pos->end = orig_field_pos->end;
break;
default:
return DN_NOOPS;
break;
}
// TODO: Consider adding a special type for cleanup scripts
msg_info.cmd_type = MSG_UNKNOWN;

update_total_num_tokens(&msg_info);

ret_status = finalize_repair_msg(ctx, conn, &msg_info, new_msg_ptr);

return ret_status;

error:
if (msg_info.keys != NULL) {
array_destroy(msg_info.keys);
}
if (msg_info.fields != NULL) {
array_destroy(msg_info.fields);
}
return DN_ERROR;
}

rstatus_t redis_clear_repair_md_for_key(struct context *ctx, struct msg *req,
struct msg **new_msg_ptr) {
// If we lost a track of the original message type, we cannot proceed.
if (req->orig_msg == NULL) return DN_NOOPS;
msg_type_t orig_msg_type = req->orig_msg->type;

// If the original request wasn't a delete, then we shouldn't clear the metadata.
if (proto_cmd_info[orig_msg_type].is_delete == false) return DN_NOOPS;

// If we haven't received responses from all the replicas yet, we shouldn't clear the MD.
if (++req->rspmgr.good_responses < req->rspmgr.max_responses) return DN_NOOPS;

rstatus_t create_status = create_cleanup_script(
ctx, req->orig_msg, req->owner, new_msg_ptr);

// If we were unsuccessful in creating the script, do nothing.
if (create_status != DN_OK) return DN_NOOPS;

// This is a best effort command, don't attempt to capture any responses to it.
(*new_msg_ptr)->expect_datastore_reply = false;
(*new_msg_ptr)->awaiting_rsps = 0;

//req_forward(ctx, req->owner, cleanup_msg);
return DN_OK;
}

0 comments on commit dfd5190

Please sign in to comment.