Skip to content

Commit

Permalink
chore: serialize SBF (dragonflydb#2846)
Browse files Browse the repository at this point in the history
* chore: serialize SBF

SAVE/LOAD supports SBF now. Also fixes MallocUsed for SBF.

---------

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Apr 9, 2024
1 parent b1e688b commit 57d5676
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,7 @@ size_t CompactObj::MallocUsed() const {
}

if (taglen_ == SBF_TAG) {
return 0; // TODO: to track SBF memory utilization.
return u_.sbf->MallocUsed();
}
LOG(DFATAL) << "should not reach";
return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/core/compact_object_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ TEST_F(CompactObjectTest, Hash) {
TEST_F(CompactObjectTest, SBF) {
cobj_.SetSBF(1000, 0.001, 2);
EXPECT_EQ(cobj_.ObjType(), OBJ_SBF);
EXPECT_EQ(0, cobj_.MallocUsed());
EXPECT_GT(cobj_.MallocUsed(), 0);
}

TEST_F(CompactObjectTest, MimallocUnderutilzation) {
Expand Down
4 changes: 3 additions & 1 deletion src/server/rdb_extensions.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ constexpr uint8_t RDB_TYPE_JSON_OLD = 20;
constexpr uint8_t RDB_TYPE_JSON = 30;
constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31;
constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32;
constexpr uint8_t RDB_TYPE_SBF = 33;

constexpr bool rdbIsObjectTypeDF(uint8_t type) {
return __rdbIsObjectType(type) || (type == RDB_TYPE_JSON) ||
(type == RDB_TYPE_HASH_WITH_EXPIRY) || (type == RDB_TYPE_SET_WITH_EXPIRY);
(type == RDB_TYPE_HASH_WITH_EXPIRY) || (type == RDB_TYPE_SET_WITH_EXPIRY) ||
(type == RDB_TYPE_SBF);
}

// Opcodes: Range 200-240 is used by DF extensions.
Expand Down
48 changes: 48 additions & 0 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ extern "C" {
#include "base/endian.h"
#include "base/flags.h"
#include "base/logging.h"
#include "core/bloom.h"
#include "core/json/json_object.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
Expand Down Expand Up @@ -382,6 +383,7 @@ class RdbLoaderBase::OpaqueObjLoader {
void operator()(const base::PODArray<char>& str);
void operator()(const LzfString& lzfstr);
void operator()(const unique_ptr<LoadTrace>& ptr);
void operator()(const RdbSBF& src);

std::error_code ec() const {
return ec_;
Expand Down Expand Up @@ -466,6 +468,16 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const unique_ptr<LoadTrace>& ptr
}
}

void RdbLoaderBase::OpaqueObjLoader::operator()(const RdbSBF& src) {
SBF* sbf =
CompactObj::AllocateMR<SBF>(src.grow_factor, src.fp_prob, src.max_capacity, src.prev_size,
src.current_size, CompactObj::memory_resource());
for (unsigned i = 0; i < src.filters.size(); ++i) {
sbf->AddFilter(src.filters[i].blob, src.filters[i].hash_cnt);
}
pv_->SetSBF(sbf);
}

void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
size_t len = ltrace->blob_count();

Expand Down Expand Up @@ -1367,6 +1379,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
case RDB_TYPE_MODULE_2:
iores = ReadRedisJson();
break;
case RDB_TYPE_SBF:
iores = ReadSBF();
break;
default:
LOG(ERROR) << "Unsupported rdb type " << rdbtype;

Expand Down Expand Up @@ -1827,6 +1842,39 @@ auto RdbLoaderBase::ReadJson() -> io::Result<OpaqueObj> {
return OpaqueObj{std::move(dest), RDB_TYPE_JSON};
}

auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
RdbSBF res;
uint64_t options;
SET_OR_UNEXPECT(LoadLen(nullptr), options);
if (options != 0)
return Unexpected(errc::rdb_file_corrupted);
SET_OR_UNEXPECT(FetchBinaryDouble(), res.grow_factor);
SET_OR_UNEXPECT(FetchBinaryDouble(), res.fp_prob);
if (res.fp_prob <= 0 || res.fp_prob > 0.5) {
return Unexpected(errc::rdb_file_corrupted);
}
SET_OR_UNEXPECT(LoadLen(nullptr), res.prev_size);
SET_OR_UNEXPECT(LoadLen(nullptr), res.current_size);
SET_OR_UNEXPECT(LoadLen(nullptr), res.max_capacity);

unsigned num_filters = 0;
SET_OR_UNEXPECT(LoadLen(nullptr), num_filters);
auto is_power2 = [](size_t n) { return (n & (n - 1)) == 0; };

for (unsigned i = 0; i < num_filters; ++i) {
unsigned hash_cnt;
string filter_data;
SET_OR_UNEXPECT(LoadLen(nullptr), hash_cnt);
SET_OR_UNEXPECT(FetchGenericString(), filter_data);
size_t bit_len = filter_data.size() * 8;
if (!is_power2(bit_len)) { // must be power of two
return Unexpected(errc::rdb_file_corrupted);
}
res.filters.emplace_back(hash_cnt, std::move(filter_data));
}
return OpaqueObj{std::move(res), RDB_TYPE_SBF};
}

template <typename T> io::Result<T> RdbLoaderBase::FetchInt() {
auto ec = EnsureRead(sizeof(T));
if (ec)
Expand Down
17 changes: 16 additions & 1 deletion src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,22 @@ class RdbLoaderBase {
uint64_t uncompressed_len;
};

struct RdbSBF {
double grow_factor, fp_prob;
size_t prev_size, current_size;
size_t max_capacity;

struct Filter {
unsigned hash_cnt;
std::string blob;
Filter(unsigned h, std::string b) : hash_cnt(h), blob(std::move(b)) {
}
};
std::vector<Filter> filters;
};

using RdbVariant =
std::variant<long long, base::PODArray<char>, LzfString, std::unique_ptr<LoadTrace>>;
std::variant<long long, base::PODArray<char>, LzfString, std::unique_ptr<LoadTrace>, RdbSBF>;

struct OpaqueObj {
RdbVariant obj;
Expand Down Expand Up @@ -131,6 +145,7 @@ class RdbLoaderBase {
::io::Result<OpaqueObj> ReadStreams();
::io::Result<OpaqueObj> ReadRedisJson();
::io::Result<OpaqueObj> ReadJson();
::io::Result<OpaqueObj> ReadSBF();

std::error_code SkipModuleData();
std::error_code HandleCompressedBlob(int op_type);
Expand Down
29 changes: 29 additions & 0 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ extern "C" {

#include "base/flags.h"
#include "base/logging.h"
#include "core/bloom.h"
#include "core/json/json_object.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
Expand Down Expand Up @@ -194,6 +195,8 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
case OBJ_JSON:
return RDB_TYPE_JSON; // save with RDB_TYPE_JSON, deprecate RDB_TYPE_JSON_OLD after July
// 2024.
case OBJ_SBF:
return RDB_TYPE_SBF;
}
LOG(FATAL) << "Unknown encoding " << compact_enc << " for type " << type;
return 0; /* avoid warning */
Expand Down Expand Up @@ -395,6 +398,10 @@ error_code RdbSerializer::SaveObject(const PrimeValue& pv) {
return SaveJsonObject(pv);
}

if (obj_type == OBJ_SBF) {
return SaveSBFObject(pv);
}

LOG(ERROR) << "Not implemented " << obj_type;
return make_error_code(errc::function_not_supported);
}
Expand Down Expand Up @@ -620,6 +627,28 @@ error_code RdbSerializer::SaveJsonObject(const PrimeValue& pv) {
return SaveString(json_string);
}

std::error_code RdbSerializer::SaveSBFObject(const PrimeValue& pv) {
SBF* sbf = pv.GetSBF();

// options to allow format mutations in the future.
RETURN_ON_ERR(SaveLen(0)); // options - reserved
RETURN_ON_ERR(SaveBinaryDouble(sbf->grow_factor()));
RETURN_ON_ERR(SaveBinaryDouble(sbf->fp_probability()));
RETURN_ON_ERR(SaveLen(sbf->prev_size()));
RETURN_ON_ERR(SaveLen(sbf->current_size()));
RETURN_ON_ERR(SaveLen(sbf->max_capacity()));
RETURN_ON_ERR(SaveLen(sbf->num_filters()));

for (unsigned i = 0; i < sbf->num_filters(); ++i) {
RETURN_ON_ERR(SaveLen(sbf->hashfunc_cnt(i)));

string_view blob = sbf->data(i);
RETURN_ON_ERR(SaveString(blob));
}

return {};
}

/* Save a long long value as either an encoded string or a string. */
error_code RdbSerializer::SaveLongLongAsString(int64_t value) {
uint8_t buf[32];
Expand Down
1 change: 1 addition & 0 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ class RdbSerializer : public SerializerBase {
std::error_code SaveZSetObject(const PrimeValue& pv);
std::error_code SaveStreamObject(const PrimeValue& obj);
std::error_code SaveJsonObject(const PrimeValue& pv);
std::error_code SaveSBFObject(const PrimeValue& pv);

std::error_code SaveLongLongAsString(int64_t value);
std::error_code SaveBinaryDouble(double val);
Expand Down
8 changes: 8 additions & 0 deletions src/server/rdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -534,4 +534,12 @@ TEST_F(RdbTest, RedisJson) {
"{\"company\":\"DragonflyDB\",\"product\":\"Dragonfly\",\"website\":\"https://"
"dragondlydb.io\",\"years-active\":[2021,2022,2023,2024,\"and more!\"]}");
}

TEST_F(RdbTest, SBF) {
EXPECT_THAT(Run({"BF.ADD", "k", "1"}), IntArg(1));
Run({"debug", "reload"});
EXPECT_EQ(Run({"type", "k"}), "MBbloom--");
EXPECT_THAT(Run({"BF.EXISTS", "k", "1"}), IntArg(1));
}

} // namespace dfly

0 comments on commit 57d5676

Please sign in to comment.