Skip to content

Commit

Permalink
fix: Huge entries fail to load outside RDB / replication (dragonflydb…
Browse files Browse the repository at this point in the history
…#4154)

* fix: Huge entries fail to load outside RDB / replication

We have an internal utility tool that we use to deserialize values in
some use cases:

* `RESTORE`
* Cluster slot migration
* `RENAME`, if the source and target shards are different

We [recently](dragonflydb#3760)
changed this area of the code, which caused this regression as it only
handled RDB / replication streams.

Fixes dragonflydb#4143
  • Loading branch information
chakaz authored Nov 20, 2024
1 parent 36135f5 commit 24a1ec6
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) {
" to meet value size.",
" If RAND is specified then value will be set to random hex string in specified size.",
" If SLOTS is specified then create keys only in given slots range.",
" TYPE specifies data type (must be STRING/LIST/SET/HSET/ZSET/JSON), default STRING.",
" TYPE specifies data type (must be STRING/LIST/SET/HASH/ZSET/JSON), default STRING.",
" ELEMENTS specifies how many sub elements if relevant (like entries in a list / set).",
"OBJHIST",
" Prints histogram of object sizes.",
Expand Down
65 changes: 43 additions & 22 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,43 +157,64 @@ class RdbRestoreValue : protected RdbLoaderBase {
const RestoreArgs& args, EngineShard* shard);

private:
std::optional<OpaqueObj> Parse(std::string_view payload);
std::optional<OpaqueObj> Parse(io::Source* source);
int rdb_type_ = -1;
};

std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(std::string_view payload) {
InMemSource source(payload);
src_ = &source;
if (io::Result<uint8_t> type_id = FetchType(); type_id && rdbIsObjectTypeDF(type_id.value())) {
OpaqueObj obj;
error_code ec = ReadObj(type_id.value(), &obj); // load the type from the input stream
if (ec) {
LOG(ERROR) << "failed to load data for type id " << (unsigned int)type_id.value();
return std::nullopt;
std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(io::Source* source) {
src_ = source;
if (pending_read_.remaining == 0) {
io::Result<uint8_t> type_id = FetchType();
if (type_id && rdbIsObjectTypeDF(type_id.value())) {
rdb_type_ = *type_id;
}
}

return std::optional<OpaqueObj>(std::move(obj));
} else {
if (rdb_type_ == -1) {
LOG(ERROR) << "failed to load type id from the input stream or type id is invalid";
return std::nullopt;
}

OpaqueObj obj;
error_code ec = ReadObj(rdb_type_, &obj); // load the type from the input stream
if (ec) {
LOG(ERROR) << "failed to load data for type id " << rdb_type_;
return std::nullopt;
}

return std::optional<OpaqueObj>(std::move(obj));
}

std::optional<DbSlice::ItAndUpdater> RdbRestoreValue::Add(std::string_view data,
std::string_view key, DbSlice& db_slice,
const DbContext& cntx,
const RestoreArgs& args,
EngineShard* shard) {
auto opaque_res = Parse(data);
if (!opaque_res) {
return std::nullopt;
}

InMemSource data_src(data);
PrimeValue pv;
if (auto ec = FromOpaque(*opaque_res, &pv); ec) {
// we failed - report and exit
LOG(WARNING) << "error while trying to save data: " << ec;
return std::nullopt;
}
bool first_parse = true;
do {
auto opaque_res = Parse(&data_src);
if (!opaque_res) {
return std::nullopt;
}

LoadConfig config;
if (first_parse) {
first_parse = false;
} else {
config.append = true;
}
if (pending_read_.remaining > 0) {
config.streamed = true;
}

if (auto ec = FromOpaque(*opaque_res, config, &pv); ec) {
// we failed - report and exit
LOG(WARNING) << "error while trying to read data: " << ec;
return std::nullopt;
}
} while (pending_read_.remaining > 0);

if (auto res = db_slice.AddNew(cntx, key, std::move(pv), args.ExpirationTime()); res) {
res->it->first.SetSticky(args.Sticky());
Expand Down
4 changes: 0 additions & 4 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2676,10 +2676,6 @@ void RdbLoader::FlushAllShards() {
FlushShardAsync(i);
}

std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, CompactObj* pv) {
return RdbLoaderBase::FromOpaque(opaque, LoadConfig{}, pv);
}

std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, LoadConfig config,
CompactObj* pv) {
OpaqueObjLoader visitor(opaque.rdb_type, pv, config);
Expand Down
1 change: 0 additions & 1 deletion src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ class RdbLoaderBase {

template <typename T> io::Result<T> FetchInt();

static std::error_code FromOpaque(const OpaqueObj& opaque, CompactObj* pv);
static std::error_code FromOpaque(const OpaqueObj& opaque, LoadConfig config, CompactObj* pv);

io::Result<uint64_t> LoadLen(bool* is_encoded);
Expand Down
41 changes: 40 additions & 1 deletion tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from redis.cluster import RedisCluster
from redis.cluster import ClusterNode
from .proxy import Proxy
from .seeder import SeederBase
from .seeder import StaticSeeder

from . import dfly_args

Expand Down Expand Up @@ -1773,6 +1773,45 @@ async def node1size0():
assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}")


@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
@pytest.mark.asyncio
async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]
df_factory.start_all(instances)

nodes = [await create_node_info(instance) for instance in instances]
nodes[0].slots = [(0, 16383)]
nodes[1].slots = []

await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

logging.debug("Generating huge containers")
seeder = StaticSeeder(
key_target=10,
data_size=10_000_000,
collection_size=10_000,
variance=1,
samples=1,
types=["LIST", "HASH", "SET", "ZSET", "STRING"],
)
await seeder.run(nodes[0].client)
source_data = await StaticSeeder.capture(nodes[0].client)

nodes[0].migrations = [
MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, 16383)], nodes[1].id)
]
logging.debug("Migrating slots")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

logging.debug("Waiting for migration to finish")
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED")

target_data = await StaticSeeder.capture(nodes[1].client)
assert source_data == target_data


def parse_lag(replication_info: str):
lags = re.findall("lag=([0-9]+)\r\n", replication_info)
assert len(lags) == 1
Expand Down
37 changes: 37 additions & 0 deletions tests/dragonfly/generic_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import logging
import pytest
import redis
import asyncio
Expand All @@ -7,6 +8,7 @@
from . import dfly_multi_test_args, dfly_args
from .instance import DflyStartException
from .utility import batch_fill_data, gen_test_data, EnvironCntx
from .seeder import StaticSeeder


@dfly_multi_test_args({"keys_output_limit": 512}, {"keys_output_limit": 1024})
Expand Down Expand Up @@ -168,3 +170,38 @@ async def test_denyoom_commands(df_factory):

# mget should not be rejected
await client.execute_command("mget x")


@pytest.mark.parametrize("type", ["LIST", "HASH", "SET", "ZSET", "STRING"])
@dfly_args({"proactor_threads": 4})
@pytest.mark.asyncio
async def test_rename_huge_values(df_factory, type):
df_server = df_factory.create()
df_server.start()
client = df_server.client()

logging.debug(f"Generating huge {type}")
seeder = StaticSeeder(
key_target=1,
data_size=10_000_000,
collection_size=10_000,
variance=1,
samples=1,
types=[type],
)
await seeder.run(client)
source_data = await StaticSeeder.capture(client)
logging.debug(f"src {source_data}")

# Rename multiple times to make sure the key moves between shards
orig_name = (await client.execute_command("keys *"))[0]
old_name = orig_name
new_name = ""
for i in range(10):
new_name = f"new:{i}"
await client.execute_command(f"rename {old_name} {new_name}")
old_name = new_name
await client.execute_command(f"rename {new_name} {orig_name}")
target_data = await StaticSeeder.capture(client)

assert source_data == target_data

0 comments on commit 24a1ec6

Please sign in to comment.