Skip to content

Commit

Permalink
fix: stream bugs (dragonflydb#4240)
Browse files Browse the repository at this point in the history
This PR syncs some of the improvements that were introduced in streams in Redis 7.2.3 OSS.

1. verify xsetid against max deleted id in the stream
2. Implement precise memory measurement of streams for "memory usage" command.
3. Always compact nodes in stream listpacks after creating new nodes.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Dec 3, 2024
1 parent 95f2320 commit 8d343bf
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 30 deletions.
91 changes: 90 additions & 1 deletion src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,95 @@ size_t MallocUsedZSet(unsigned encoding, void* ptr) {
return 0;
}

/* This is a helper function with the goal of estimating the memory
* size of a radix tree that is used to store Stream IDs.
*
* Note: to guess the size of the radix tree is not trivial, so we
* approximate it considering 16 bytes of data overhead for each
* key (the ID), and then adding the number of bare nodes, plus some
* overhead due by the data and child pointers. This secret recipe
* was obtained by checking the average radix tree created by real
* workloads, and then adjusting the constants to get numbers that
* more or less match the real memory usage.
*
* Actually the number of nodes and keys may be different depending
* on the insertion speed and thus the ability of the radix tree
* to compress prefixes. */
size_t streamRadixTreeMemoryUsage(rax* rax) {
size_t size = sizeof(*rax);
size = rax->numele * sizeof(streamID);
size += rax->numnodes * sizeof(raxNode);
/* Add a fixed overhead due to the aux data pointer, children, ... */
size += rax->numnodes * sizeof(long) * 30;
return size;
}

size_t MallocUsedStream(stream* s) {
size_t asize = sizeof(*s);
asize += streamRadixTreeMemoryUsage(s->rax_tree);

/* Now we have to add the listpacks. The last listpack is often non
* complete, so we estimate the size of the first N listpacks, and
* use the average to compute the size of the first N-1 listpacks, and
* finally add the real size of the last node. */
raxIterator ri;
raxStart(&ri, s->rax_tree);
raxSeek(&ri, "^", NULL, 0);
size_t lpsize = 0, samples = 0;
while (raxNext(&ri)) {
uint8_t* lp = (uint8_t*)ri.data;
/* Use the allocated size, since we overprovision the node initially. */
lpsize += zmalloc_size(lp);
samples++;
}
if (s->rax_tree->numele <= samples) {
asize += lpsize;
} else {
if (samples)
lpsize /= samples; /* Compute the average. */
asize += lpsize * (s->rax_tree->numele - 1);
/* No need to check if seek succeeded, we enter this branch only
* if there are a few elements in the radix tree. */
raxSeek(&ri, "$", NULL, 0);
raxNext(&ri);
/* Use the allocated size, since we overprovision the node initially. */
asize += zmalloc_size(ri.data);
}
raxStop(&ri);

/* Consumer groups also have a non trivial memory overhead if there
* are many consumers and many groups, let's count at least the
* overhead of the pending entries in the groups and consumers
* PELs. */
if (s->cgroups) {
raxStart(&ri, s->cgroups);
raxSeek(&ri, "^", NULL, 0);
while (raxNext(&ri)) {
streamCG* cg = (streamCG*)ri.data;
asize += sizeof(*cg);
asize += streamRadixTreeMemoryUsage(cg->pel);
asize += sizeof(streamNACK) * raxSize(cg->pel);

/* For each consumer we also need to add the basic data
* structures and the PEL memory usage. */
raxIterator cri;
raxStart(&cri, cg->consumers);
raxSeek(&cri, "^", NULL, 0);
while (raxNext(&cri)) {
const streamConsumer* consumer = (const streamConsumer*)cri.data;
asize += sizeof(*consumer);
asize += sdslen(consumer->name);
asize += streamRadixTreeMemoryUsage(consumer->pel);
/* Don't count NACKs again, they are shared with the
* consumer group PEL. */
}
raxStop(&cri);
}
raxStop(&ri);
}
return asize;
}

inline void FreeObjHash(unsigned encoding, void* ptr) {
switch (encoding) {
case kEncodingStrMap2:
Expand Down Expand Up @@ -311,7 +400,7 @@ size_t RobjWrapper::MallocUsed(bool slow) const {
case OBJ_ZSET:
return MallocUsedZSet(encoding_, inner_obj_);
case OBJ_STREAM:
return sz_;
return slow ? MallocUsedStream((stream*)inner_obj_) : sz_;

default:
LOG(FATAL) << "Not supported " << type_;
Expand Down
1 change: 0 additions & 1 deletion src/facade/op_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ enum class OpStatus : uint16_t {
SYNTAX_ERR,
BUSY_GROUP,
STREAM_ID_SMALL,
ENTRIES_ADDED_SMALL,
INVALID_NUMERIC_RESULT,
CANCELLED,
AT_LEAST_ONE_KEY,
Expand Down
5 changes: 2 additions & 3 deletions src/facade/reply_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,8 @@ TEST_F(RedisReplyBuilderTest, ErrorReplyBuiltInMessage) {

TEST_F(RedisReplyBuilderTest, ErrorNoneBuiltInMessage) {
// All these op codes creating the same error message
OpStatus none_unique_codes[] = {OpStatus::ENTRIES_ADDED_SMALL, OpStatus::SKIPPED,
OpStatus::KEY_EXISTS, OpStatus::INVALID_VALUE,
OpStatus::TIMED_OUT, OpStatus::STREAM_ID_SMALL};
OpStatus none_unique_codes[] = {OpStatus::SKIPPED, OpStatus::KEY_EXISTS, OpStatus::INVALID_VALUE,
OpStatus::TIMED_OUT, OpStatus::STREAM_ID_SMALL};
uint64_t error_count = 0;
for (const auto& err : none_unique_codes) {
const std::string_view error_name = StatusToMsg(err);
Expand Down
59 changes: 34 additions & 25 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ string NoGroupOrKey(string_view key, string_view cgroup, string_view suffix = ""
suffix);
}

string LeqTopIdError(string_view cmd_name) {
return absl::StrCat("The ID specified in ", cmd_name,
" is equal or smaller than the target stream top item");
}

inline const uint8_t* SafePtr(MutableSlice field) {
return field.empty() ? reinterpret_cast<const uint8_t*>("")
: reinterpret_cast<const uint8_t*>(field.data());
Expand Down Expand Up @@ -424,23 +429,28 @@ int StreamAppendItem(stream* s, CmdArgList fields, uint64_t now_ms, streamID* ad
* if we need to switch to the next one. 'lp' will be set to NULL if
* the current node is full. */
if (lp != NULL) {
int new_node = 0;
size_t node_max_bytes = kStreamNodeMaxBytes;
if (node_max_bytes == 0 || node_max_bytes > STREAM_LISTPACK_MAX_SIZE)
node_max_bytes = STREAM_LISTPACK_MAX_SIZE;
if (lp_bytes + totelelen >= node_max_bytes) {
lp = NULL;
new_node = 1;
} else if (kStreamNodeMaxEntries) {
unsigned char* lp_ele = lpFirst(lp);
/* Count both live entries and deleted ones. */
int64_t count = lpGetInteger(lp_ele) + lpGetInteger(lpNext(lp, lp_ele));
if (count >= kStreamNodeMaxEntries) {
/* Shrink extra pre-allocated memory */
lp = lpShrinkToFit(lp);
if (ri.data != lp)
raxInsert(s->rax_tree, ri.key, ri.key_len, lp, NULL);
lp = NULL;
new_node = 1;
}
}

if (new_node) {
/* Shrink extra pre-allocated memory */
lp = lpShrinkToFit(lp);
if (ri.data != lp)
raxInsert(s->rax_tree, ri.key, ri.key_len, lp, NULL);
lp = NULL;
}
}

int flags = 0;
Expand Down Expand Up @@ -1403,7 +1413,7 @@ OpStatus OpSetId(const OpArgs& op_args, string_view key, string_view gname, stri
return OpStatus::OK;
}

OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) {
ErrorReply OpXSetId(const OpArgs& op_args, string_view key, const streamID& sid) {
auto& db_slice = op_args.GetDbSlice();
auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM);
if (!res_it)
Expand All @@ -1415,13 +1425,18 @@ OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) {
stream* stream_inst = (stream*)cobj.RObjPtr();
long long entries_added = -1;
streamID max_xdel_id{0, 0};
streamID id = sid;

if (streamCompareID(&id, &stream_inst->max_deleted_entry_id) < 0) {
return ErrorReply{"The ID specified in XSETID is smaller than current max_deleted_entry_id",
"stream_smaller_deleted"};
}

/* If the stream has at least one item, we want to check that the user
* is setting a last ID that is equal or greater than the current top
* item, otherwise the fundamental ID monotonicity assumption is violated. */
if (stream_inst->length > 0) {
streamID maxid;
streamID id = sid;
streamLastValidID(stream_inst, &maxid);

if (streamCompareID(&id, &maxid) < 0) {
Expand All @@ -1430,7 +1445,9 @@ OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) {

/* If an entries_added was provided, it can't be lower than the length. */
if (entries_added != -1 && stream_inst->length > uint64_t(entries_added)) {
return OpStatus::ENTRIES_ADDED_SMALL;
return ErrorReply{
"The entries_added specified in XSETID is smaller than the target stream length",
"stream_added_small"};
}
}

Expand Down Expand Up @@ -2557,9 +2574,7 @@ void StreamFamily::XAdd(CmdArgList args, const CommandContext& cmd_cntx) {
}

if (add_result.status() == OpStatus::STREAM_ID_SMALL) {
return cmd_cntx.rb->SendError(
"The ID specified in XADD is equal or smaller than "
"the target stream top item");
return cmd_cntx.rb->SendError(LeqTopIdError("XADD"));
}

return cmd_cntx.rb->SendError(add_result.status());
Expand Down Expand Up @@ -3133,23 +3148,17 @@ void StreamFamily::XSetId(CmdArgList args, const CommandContext& cmd_cntx) {
return cmd_cntx.rb->SendError(kInvalidStreamId, kSyntaxErrType);
}

facade::ErrorReply reply(OpStatus::OK);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpSetId2(t->GetOpArgs(shard), key, parsed_id.val);
reply = OpXSetId(t->GetOpArgs(shard), key, parsed_id.val);
return OpStatus::OK;
};

OpStatus result = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
switch (result) {
case OpStatus::STREAM_ID_SMALL:
return cmd_cntx.rb->SendError(
"The ID specified in XSETID is smaller than the "
"target stream top item");
case OpStatus::ENTRIES_ADDED_SMALL:
return cmd_cntx.rb->SendError(
"The entries_added specified in XSETID is smaller than "
"the target stream length");
default:
return cmd_cntx.rb->SendError(result);
cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
if (reply.status == OpStatus::STREAM_ID_SMALL) {
return cmd_cntx.rb->SendError(LeqTopIdError("XSETID"));
}
return cmd_cntx.rb->SendError(reply);
}

void StreamFamily::XTrim(CmdArgList args, const CommandContext& cmd_cntx) {
Expand Down
22 changes: 22 additions & 0 deletions src/server/stream_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1168,4 +1168,26 @@ TEST_F(StreamFamilyTest, XAddMaxSeq) {
EXPECT_THAT(resp, ErrArg("The ID specified in XADD is equal or smaller"));
}

TEST_F(StreamFamilyTest, XsetIdSmallerMaxDeleted) {
Run({"XADD", "x", "1-1", "a", "1"});
Run({"XADD", "x", "1-2", "b", "2"});
Run({"XADD", "x", "1-3", "c", "3"});
Run({"XDEL", "x", "1-2"});
Run({"XDEL", "x", "1-3"});
auto resp = Run({"XINFO", "stream", "x"});
ASSERT_THAT(resp, ArgType(RespExpr::ARRAY));
auto vec = resp.GetVec();
string max_del_id;
for (unsigned i = 0; i < vec.size(); i += 2) {
if (vec[i] == "max-deleted-entry-id") {
max_del_id = vec[i + 1].GetString();
break;
}
}
EXPECT_EQ(max_del_id, "1-3");

resp = Run({"XSETID", "x", "1-2"});
ASSERT_THAT(resp, ErrArg("smaller"));
}

} // namespace dfly

0 comments on commit 8d343bf

Please sign in to comment.