Skip to content

Commit

Permalink
feat: add bf.(m)add and bf.(m)exists commands (dragonflydb#2801)
Browse files Browse the repository at this point in the history
Adresses dragonflydb#1275

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Mar 31, 2024
1 parent 1d04683 commit 7f0f624
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 6 deletions.
5 changes: 5 additions & 0 deletions src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,11 @@ void CompactObj::SetSBF(uint64_t initial_capacity, double fp_prob, double grow_f
}
}

SBF* CompactObj::GetSBF() const {
DCHECK_EQ(SBF_TAG, taglen_);
return u_.sbf;
}

void CompactObj::SetString(std::string_view str) {
uint8_t mask = mask_ & ~kEncMask;
CHECK(!IsExternal());
Expand Down
1 change: 1 addition & 0 deletions src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ class CompactObj {
JsonType* GetJson() const;

void SetSBF(uint64_t initial_capacity, double fp_prob, double grow_factor);
SBF* GetSBF() const;

// dest must have at least Size() bytes available
void GetString(char* dest) const;
Expand Down
121 changes: 117 additions & 4 deletions src/server/bloom_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "server/bloom_family.h"

#include "core/bloom.h"
#include "facade/cmd_arg_parser.h"
#include "facade/error.h"
#include "server/command_registry.h"
Expand All @@ -18,16 +19,21 @@ using namespace std;

namespace {

constexpr double kDefaultFpProb = 0.01;
constexpr double kDefaultGrowFactor = 2;
struct SbfParams {
uint32_t init_capacity;
double error;
double grow_factor = 2.0;
double grow_factor = kDefaultGrowFactor;

bool ok() const {
return error > 0 and error < 0.5;
}
};

using AddResult = absl::InlinedVector<OpResult<bool>, 4>;
using ExistsResult = absl::InlinedVector<bool, 4>;

OpStatus OpReserve(const SbfParams& params, const OpArgs& op_args, string_view key) {
auto& db_slice = op_args.shard->db_slice();
OpResult op_res = db_slice.AddOrFind(op_args.db_cntx, key);
Expand All @@ -42,6 +48,47 @@ OpStatus OpReserve(const SbfParams& params, const OpArgs& op_args, string_view k
return OpStatus::OK;
}

// Returns true, if item was added, false if it was already "present".
OpResult<AddResult> OpAdd(const OpArgs& op_args, string_view key, CmdArgList items) {
auto& db_slice = op_args.shard->db_slice();

OpResult op_res = db_slice.AddOrFind(op_args.db_cntx, key);
if (!op_res)
return op_res.status();
PrimeValue& pv = op_res->it->second;

if (op_res->is_new) {
pv.SetSBF(0, kDefaultFpProb, kDefaultGrowFactor);
} else {
if (op_res->it->second.ObjType() != OBJ_SBF)
return OpStatus::WRONG_TYPE;
}

SBF* sbf = pv.GetSBF();
AddResult result(items.size());
for (size_t i = 0; i < items.size(); ++i) {
result[i] = sbf->Add(ToSV(items[i]));
}
return result;
}

OpResult<ExistsResult> OpExists(const OpArgs& op_args, string_view key, CmdArgList items) {
auto& db_slice = op_args.shard->db_slice();
OpResult op_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_SBF);
if (!op_res)
return op_res.status();
auto it = (*op_res);

const SBF* sbf = it->second.GetSBF();
ExistsResult result(items.size());

for (size_t i = 0; i < items.size(); ++i) {
result[i] = sbf->Exists(ToSV(items[i]));
}

return result;
}

} // namespace

void BloomFamily::Reserve(CmdArgList args, ConnectionContext* cntx) {
Expand Down Expand Up @@ -69,11 +116,75 @@ void BloomFamily::Reserve(CmdArgList args, ConnectionContext* cntx) {
}

void BloomFamily::Add(CmdArgList args, ConnectionContext* cntx) {
cntx->SendError(kSyntaxErr);
string_view key = ArgS(args, 0);
args.remove_prefix(1);

const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpAdd(t->GetOpArgs(shard), key, args);
};

OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpStatus status = res.status();
if (res) {
if (res->front())
return cntx->SendLong(*res->front());
else
status = res->front().status();
}

return cntx->SendError(status);
}

void BloomFamily::Exists(CmdArgList args, ConnectionContext* cntx) {
cntx->SendError(kSyntaxErr);
string_view key = ArgS(args, 0);
args.remove_prefix(1);
const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExists(t->GetOpArgs(shard), key, args);
};

OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
return cntx->SendLong(res ? res->front() : 0);
}

void BloomFamily::MAdd(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);

const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpAdd(t->GetOpArgs(shard), key, args);
};

OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (!res) {
return cntx->SendError(res.status());
}
const AddResult& add_res = *res;
RedisReplyBuilder* rb = (RedisReplyBuilder*)cntx->reply_builder();
rb->StartArray(add_res.size());
for (const OpResult<bool>& val : add_res) {
if (val) {
cntx->SendLong(*val);
} else {
cntx->SendError(val.status());
}
}
}

void BloomFamily::MExists(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);

const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExists(t->GetOpArgs(shard), key, args);
};

OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));

RedisReplyBuilder* rb = (RedisReplyBuilder*)cntx->reply_builder();
rb->StartArray(args.size());
for (size_t i = 0; i < args.size(); ++i) {
cntx->SendLong(res ? res->at(i) : 0);
}
}

using CI = CommandId;
Expand All @@ -86,7 +197,9 @@ void BloomFamily::Register(CommandRegistry* registry) {
*registry << CI{"BF.RESERVE", CO::WRITE | CO::DENYOOM | CO::FAST, -4, 1, 1, acl::BLOOM}.HFUNC(
Reserve)
<< CI{"BF.ADD", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, acl::BLOOM}.HFUNC(Add)
<< CI{"BF.EXISTS", CO::READONLY | CO::FAST, 3, 1, 1, acl::BLOOM}.HFUNC(Exists);
<< CI{"BF.MADD", CO::WRITE | CO::DENYOOM | CO::FAST, -3, 1, 1, acl::BLOOM}.HFUNC(MAdd)
<< CI{"BF.EXISTS", CO::READONLY | CO::FAST, 3, 1, 1, acl::BLOOM}.HFUNC(Exists)
<< CI{"BF.MEXISTS", CO::READONLY | CO::FAST, -3, 1, 1, acl::BLOOM}.HFUNC(MExists);
};

} // namespace dfly
2 changes: 2 additions & 0 deletions src/server/bloom_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ class BloomFamily {
private:
static void Reserve(CmdArgList args, ConnectionContext* cntx);
static void Add(CmdArgList args, ConnectionContext* cntx);
static void MAdd(CmdArgList args, ConnectionContext* cntx);
static void Exists(CmdArgList args, ConnectionContext* cntx);
static void MExists(CmdArgList args, ConnectionContext* cntx);
};

} // namespace dfly
35 changes: 33 additions & 2 deletions src/server/bloom_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,46 @@

namespace dfly {

using testing::ElementsAre;

class BloomFamilyTest : public BaseFamilyTest {
protected:
};

TEST_F(BloomFamilyTest, Basic) {
auto resp = Run({"bf.reserve", "b1", "0.1", "32"});
EXPECT_EQ(resp, "OK");
resp = Run({"type", "b1"});
EXPECT_EQ(resp, "MBbloom--");
EXPECT_EQ(Run({"type", "b1"}), "MBbloom--");
EXPECT_THAT(Run({"bf.add", "b1", "a"}), IntArg(1));
EXPECT_THAT(Run({"bf.add", "b1", "b"}), IntArg(1));
EXPECT_THAT(Run({"bf.add", "b1", "b"}), IntArg(0));
EXPECT_THAT(Run({"bf.add", "b2", "b"}), IntArg(1));
EXPECT_EQ(Run({"type", "b2"}), "MBbloom--");

EXPECT_THAT(Run({"bf.exists", "b2", "c"}), IntArg(0));
EXPECT_THAT(Run({"bf.exists", "b3", "c"}), IntArg(0));
EXPECT_THAT(Run({"bf.exists", "b2", "b"}), IntArg(1));
Run({"set", "str", "foo"});
EXPECT_THAT(Run({"bf.exists", "str", "b"}), IntArg(0));
}

TEST_F(BloomFamilyTest, Multiple) {
auto resp = Run({"bf.mexists", "bf1", "a", "b", "c"});
EXPECT_THAT(resp, RespArray(ElementsAre(IntArg(0), IntArg(0), IntArg(0))));

Run({"set", "str", "foo"});
resp = Run({"bf.mexists", "str", "a", "b", "c"});
EXPECT_THAT(resp, RespArray(ElementsAre(IntArg(0), IntArg(0), IntArg(0))));

resp = Run({"bf.madd", "str", "a"});
EXPECT_THAT(resp, ErrArg("WRONG"));

resp = Run({"bf.madd", "bf1", "a", "b", "c"});
EXPECT_THAT(resp, RespArray(ElementsAre(IntArg(1), IntArg(1), IntArg(1))));
resp = Run({"bf.madd", "bf1", "a", "b", "c"});
EXPECT_THAT(resp, RespArray(ElementsAre(IntArg(0), IntArg(0), IntArg(0))));
resp = Run({"bf.mexists", "bf1", "a", "b", "c"});
EXPECT_THAT(resp, RespArray(ElementsAre(IntArg(1), IntArg(1), IntArg(1))));
}

} // namespace dfly

0 comments on commit 7f0f624

Please sign in to comment.