Skip to content

Commit

Permalink
fix(server): fix bug in replication on cached mode (dragonflydb#3156)
Browse files Browse the repository at this point in the history
* fix server: fix replication on cached mode

Signed-off-by: adi_holden <[email protected]>
  • Loading branch information
adiholden authored Jun 17, 2024
1 parent 664cba9 commit 805c024
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 33 deletions.
26 changes: 2 additions & 24 deletions src/core/dash_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -1544,31 +1544,9 @@ std::enable_if_t<UV, unsigned> Segment<Key, Value, Policy>::CVCOnBump(uint64_t v
result_bid[result++] = bid;
}
const uint8_t target_bid = BucketIndex(hash);
const auto& target = bucket_[target_bid];
result_bid[result++] = target_bid;
const uint8_t probing_bid = NextBid(target_bid);
const auto& probing = bucket_[probing_bid];

unsigned stash_pos = bid - kBucketNum;
uint8_t fp_hash = hash & kFpMask;

auto find_stash = [&](unsigned, unsigned pos) {
return stash_pos == pos ? slot : BucketType::kNanSlot;
};

auto do_fun = [&result, &result_bid, fp_hash, find_stash](auto& target, auto target_bid,
bool probe) {
if (target.HasStashOverflow()) {
result_bid[result++] = target_bid;
} else {
SlotId slot_id = target.IterateStash(fp_hash, probe, find_stash).second;
if (slot_id != BucketType::kNanSlot) {
result_bid[result++] = target_bid;
}
}
};

do_fun(target, target_bid, false);
do_fun(probing, probing_bid, true);
result_bid[result++] = probing_bid;

return result;
}
Expand Down
10 changes: 5 additions & 5 deletions src/core/dash_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,13 @@ TEST_F(DashTest, BumpUp) {
key = segment_.Key(kSecondStashId, 9);
hash = dt_.DoHash(key);

EXPECT_EQ(2, segment_.CVCOnBump(2, kSecondStashId, 9, hash, touched_bid));
EXPECT_EQ(3, segment_.CVCOnBump(2, kSecondStashId, 9, hash, touched_bid));
EXPECT_EQ(touched_bid[0], kSecondStashId);
// There used to be a bug here because two bucket slots are swapped within BumpUp
// and both of them have version < version_threshold. If we don't return them both
// then full sync phase during replication might fail to capture the changes leading
// to data inconsistencies between master and replica.
// Bumpup will move the key to either its original bucket or a probing bucket.
// Since we can't determine the exact bucket before calling bumpup, CVCOnBump
// returns both the original bucket and the probing bucket.
EXPECT_EQ(touched_bid[1], 0);
EXPECT_EQ(touched_bid[2], 1);

segment_.BumpUp(kSecondStashId, 9, hash, RelaxedBumpPolicy{});
ASSERT_TRUE(key == segment_.Key(0, kSlotNum - 1) || key == segment_.Key(1, kSlotNum - 1));
Expand Down
4 changes: 0 additions & 4 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ async def wait_for_replicas_state(*clients, state="stable_sync", timeout=0.05):
async def test_replication_all(
df_local_factory: DflyInstanceFactory, t_master, t_replicas, seeder_config, stream_target, mode
):
# Temporary disable the test until it passes reliably with cache mode.
if mode:
pytest.skip()

if mode:
mode["maxmemory"] = str(t_master * 256) + "mb"

Expand Down

0 comments on commit 805c024

Please sign in to comment.