Skip to content

Commit

Permalink
fix: bugs in stream code (dragonflydb#4239)
Browse files Browse the repository at this point in the history
* fix: bugs in stream code

1. Memory leak in streamGetEdgeID
2. Addresses CVE-2022-31144
3. Fixes XAUTOCLAIM bugs and adds tests.
4. Limits the count argument in XAUTOCLAIM command to 2^18 (CVE-2022-35951)

Also fixes dragonflydb#3830

---------

Signed-off-by: Roman Gershman <[email protected]>
Signed-off-by: Roman Gershman <[email protected]>
Co-authored-by: Shahar Mike <[email protected]>
  • Loading branch information
romange and chakaz authored Dec 2, 2024
1 parent 91aff49 commit dcee9a9
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 4 deletions.
4 changes: 4 additions & 0 deletions src/facade/facade_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ MATCHER_P(RespArray, value, "") {
result_listener);
}

template <typename... Args> auto RespElementsAre(const Args&... matchers) {
return RespArray(::testing::ElementsAre(matchers...));
}

inline bool operator==(const RespExpr& left, std::string_view s) {
return left.type == RespExpr::STRING && ToSV(left.GetBuf()) == s;
}
Expand Down
4 changes: 2 additions & 2 deletions src/redis/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_i
streamID min_id = {0, 0}, max_id = {UINT64_MAX, UINT64_MAX};
*edge_id = first ? max_id : min_id;
}

streamIteratorStop(&si);
}

/* Trim the stream 's' according to args->trim_strategy, and return the
Expand Down Expand Up @@ -336,7 +336,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
streamDecodeID(ri.key, &master_id);

/* Read last ID. */
streamID last_id;
streamID last_id = {0, 0};
lpGetEdgeStreamID(lp, 0, &master_id, &last_id);

/* We can remove the entire node id its last ID < 'id' */
Expand Down
9 changes: 7 additions & 2 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1568,11 +1568,15 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
streamDecodeID(ri.key, &id);

if (!streamEntryExists(stream, &id)) {
// TODO: to propagate this change to replica as XCLAIM command
// - since we delete it from NACK. See streamPropagateXCLAIM call.
raxRemove(group->pel, ri.key, ri.key_len, nullptr);
raxRemove(nack->consumer->pel, ri.key, ri.key_len, nullptr);
streamFreeNACK(nack);
result.deleted_ids.push_back(id);
raxSeek(&ri, ">=", ri.key, ri.key_len);

count--; /* Count is a limit of the command response size. */
continue;
}

Expand Down Expand Up @@ -1603,6 +1607,7 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl

AppendClaimResultItem(result, stream, id);
count--;
// TODO: propagate xclaim to replica
}

raxNext(&ri);
Expand Down Expand Up @@ -3229,8 +3234,8 @@ void StreamFamily::XAutoClaim(CmdArgList args, const CommandContext& cmd_cntx) {
if (!absl::SimpleAtoi(arg, &opts.count)) {
return rb->SendError(kInvalidIntErr);
}
if (opts.count <= 0) {
return rb->SendError("COUNT must be > 0");
if (opts.count <= 0 || opts.count >= (1L << 18)) {
return rb->SendError("COUNT must be > 0 and less than 2^18");
}
continue;
}
Expand Down
88 changes: 88 additions & 0 deletions src/server/stream_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,94 @@ TEST_F(StreamFamilyTest, XInfoStream) {
"pel-count", IntArg(11), "pending", ArrLen(11)));
}

TEST_F(StreamFamilyTest, AutoClaimPelItemsFromAnotherConsumer) {
auto resp = Run({"xadd", "mystream", "*", "a", "1"});
string id1 = resp.GetString();
resp = Run({"xadd", "mystream", "*", "b", "2"});
string id2 = resp.GetString();
resp = Run({"xadd", "mystream", "*", "c", "3"});
string id3 = resp.GetString();
resp = Run({"xadd", "mystream", "*", "d", "4"});
string id4 = resp.GetString();

Run({"XGROUP", "CREATE", "mystream", "mygroup", "0"});

// Consumer 1 reads item 1 from the stream without acknowledgements.
// Consumer 2 then claims pending item 1 from the PEL of consumer 1
resp = Run(
{"XREADGROUP", "GROUP", "mygroup", "consumer1", "COUNT", "1", "STREAMS", "mystream", ">"});

auto match_a1 = RespElementsAre("a", "1");
ASSERT_THAT(resp, RespElementsAre("mystream", RespElementsAre(RespElementsAre(id1, match_a1))));

AdvanceTime(200); // Advance time to greater time than the idle time in the autoclaim (10)
resp = Run({"XAUTOCLAIM", "mystream", "mygroup", "consumer2", "10", "-", "COUNT", "1"});

EXPECT_THAT(resp, RespElementsAre("0-0", ArrLen(1), ArrLen(0)));
EXPECT_THAT(resp.GetVec()[1], RespElementsAre(RespElementsAre(id1, match_a1)));

Run({"XREADGROUP", "GROUP", "mygroup", "consumer1", "COUNT", "3", "STREAMS", "mystream", ">"});
AdvanceTime(200);

// Delete item 2 from the stream.Now consumer 1 has PEL that contains
// only item 3. Try to use consumer 2 to claim the deleted item 2
// from the PEL of consumer 1, this should return nil
resp = Run({"XDEL", "mystream", id2});
ASSERT_THAT(resp, IntArg(1));

// id1 and id3 are self - claimed here but not id2('count' was set to 3)
// we make sure id2 is indeed skipped(the cursor points to id4)
resp = Run({"XAUTOCLAIM", "mystream", "mygroup", "consumer2", "10", "-", "COUNT", "3"});
auto match_id1_a1 = RespElementsAre(id1, match_a1);
auto match_id3_c3 = RespElementsAre(id3, RespElementsAre("c", "3"));
ASSERT_THAT(resp, RespElementsAre(id4, RespElementsAre(match_id1_a1, match_id3_c3),
RespElementsAre(id2)));
// Delete item 3 from the stream.Now consumer 1 has PEL that is empty.
// Try to use consumer 2 to claim the deleted item 3 from the PEL
// of consumer 1, this should return nil
AdvanceTime(200);

ASSERT_THAT(Run({"XDEL", "mystream", id4}), IntArg(1));

// id1 and id3 are self - claimed here but not id2 and id4('count' is default 100)
// we also test the JUSTID modifier here.note that, when using JUSTID,
// deleted entries are returned in reply(consistent with XCLAIM).
resp = Run({"XAUTOCLAIM", "mystream", "mygroup", "consumer2", "10", "-", "JUSTID"});
ASSERT_THAT(resp, RespElementsAre("0-0", RespElementsAre(id1, id3), RespElementsAre(id4)));
}

TEST_F(StreamFamilyTest, AutoClaimDelCount) {
Run({"xadd", "x", "1-0", "f", "v"});
Run({"xadd", "x", "2-0", "f", "v"});
Run({"xadd", "x", "3-0", "f", "v"});
Run({"XGROUP", "CREATE", "x", "grp", "0"});
auto resp = Run({"XREADGROUP", "GROUP", "grp", "Alice", "STREAMS", "x", ">"});

auto m1 = RespElementsAre("1-0", _);
auto m2 = RespElementsAre("2-0", _);
auto m3 = RespElementsAre("3-0", _);
EXPECT_THAT(resp, RespElementsAre("x", RespElementsAre(m1, m2, m3)));

EXPECT_THAT(Run({"XDEL", "x", "1-0"}), IntArg(1));
EXPECT_THAT(Run({"XDEL", "x", "2-0"}), IntArg(1));

resp = Run({"XAUTOCLAIM", "x", "grp", "Bob", "0", "0-0", "COUNT", "1"});
EXPECT_THAT(resp, RespElementsAre("2-0", ArrLen(0), RespElementsAre("1-0")));

resp = Run({"XAUTOCLAIM", "x", "grp", "Bob", "0", "2-0", "COUNT", "1"});
EXPECT_THAT(resp, RespElementsAre("3-0", ArrLen(0), RespElementsAre("2-0")));

resp = Run({"XAUTOCLAIM", "x", "grp", "Bob", "0", "3-0", "COUNT", "1"});
EXPECT_THAT(resp, RespElementsAre(
"0-0", RespElementsAre(RespElementsAre("3-0", RespElementsAre("f", "v"))),
ArrLen(0)));
resp = Run({"xpending", "x", "grp", "-", "+", "10", "Alice"});
EXPECT_THAT(resp, ArrLen(0));

resp = Run({"XAUTOCLAIM", "x", "grp", "Bob", "0", "3-0", "COUNT", "704505322"});
EXPECT_THAT(resp, ErrArg("COUNT"));
}

TEST_F(StreamFamilyTest, XAddMaxSeq) {
Run({"XADD", "x", "1-18446744073709551615", "f1", "v1"});
auto resp = Run({"XADD", "x", "1-*", "f2", "v2"});
Expand Down

0 comments on commit dcee9a9

Please sign in to comment.