Skip to content

Commit

Permalink
chore(server): reduce iouring exposure across the codebase (dragonfly…
Browse files Browse the repository at this point in the history
…db#342)

Refactor RdbSnapshot and consolidate iouring linux file handling in a single place.
  • Loading branch information
romange authored Oct 2, 2022
1 parent 21feebe commit cf779c0
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 67 deletions.
1 change: 0 additions & 1 deletion src/server/bitops_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "server/error.h"
#include "server/test_utils.h"
#include "server/transaction.h"
#include "util/uring/uring_pool.h"

using namespace testing;
using namespace std;
Expand Down
2 changes: 1 addition & 1 deletion src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "server/server_state.h"
#include "server/string_family.h"
#include "server/transaction.h"
#include "util/uring/uring_fiber_algo.h"
#include "util/fiber_sched_algo.h"

using namespace std;

Expand Down
1 change: 0 additions & 1 deletion src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ extern "C" {
#include "server/conn_context.h"
#include "server/main_service.h"
#include "server/test_utils.h"
#include "util/uring/uring_pool.h"

namespace dfly {

Expand Down
1 change: 0 additions & 1 deletion src/server/generic_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "server/string_family.h"
#include "server/test_utils.h"
#include "server/transaction.h"
#include "util/uring/uring_pool.h"

using namespace testing;
using namespace std;
Expand Down
1 change: 0 additions & 1 deletion src/server/list_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include "server/string_family.h"
#include "server/test_utils.h"
#include "server/transaction.h"
#include "util/uring/uring_pool.h"

using namespace testing;
using namespace std;
Expand Down
1 change: 0 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ extern "C" {
#include "server/version.h"
#include "server/zset_family.h"
#include "util/html/sorted_table.h"
#include "util/uring/uring_fiber_algo.h"
#include "util/varz.h"

using namespace std;
Expand Down
9 changes: 0 additions & 9 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -672,15 +672,6 @@ error_code RdbSerializer::SaveLzfBlob(const io::Bytes& src, size_t uncompressed_
return error_code{};
}

io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
io::Result<size_t> res = lf_->WriteSome(v, len, offset_, 0);
if (res) {
offset_ += *res;
}

return res;
}

AlignedBuffer::AlignedBuffer(size_t cap, ::io::Sink* upstream)
: capacity_(cap), upstream_(upstream) {
aligned_buf_ = (char*)mi_malloc_aligned(kBufLen, 4_KB);
Expand Down
17 changes: 0 additions & 17 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ extern "C" {
#include "io/io.h"
#include "server/common.h"
#include "server/table.h"
#include "util/uring/uring_file.h"

typedef struct rax rax;
typedef struct streamCG streamCG;
Expand All @@ -24,22 +23,6 @@ namespace dfly {

class EngineShard;

class LinuxWriteWrapper : public io::Sink {
public:
LinuxWriteWrapper(util::uring::LinuxFile* lf) : lf_(lf) {
}

io::Result<size_t> WriteSome(const iovec* v, uint32_t len) final;

std::error_code Close() {
return lf_->Close();
}

private:
util::uring::LinuxFile* lf_;
off_t offset_ = 0;
};

class AlignedBuffer : public ::io::Sink {
public:
using io::Sink::Write;
Expand Down
1 change: 0 additions & 1 deletion src/server/rdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ extern "C" {
#include "server/engine_shard_set.h"
#include "server/rdb_load.h"
#include "server/test_utils.h"
#include "util/uring/uring_pool.h"

using namespace testing;
using namespace std;
Expand Down
88 changes: 55 additions & 33 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,28 @@ bool IsValidSaveScheduleNibble(string_view time, unsigned int max) {
return min_match <= max;
}

class RdbSnapshot {
// takes ownership over the file.
class LinuxWriteWrapper : public io::Sink {
public:
RdbSnapshot(bool single_shard, uring::LinuxFile* fl)
: file_(fl), linux_sink_(fl), saver_(&linux_sink_, single_shard, kRdbWriteFlags & O_DIRECT) {
LinuxWriteWrapper(uring::LinuxFile* lf) : lf_(lf) {
}

io::Result<size_t> WriteSome(const iovec* v, uint32_t len) final;

std::error_code Close() {
return lf_->Close();
}

error_code Start(const StringVec& lua_scripts);
private:
std::unique_ptr<uring::LinuxFile> lf_;
off_t offset_ = 0;
};

class RdbSnapshot {
public:
RdbSnapshot() {}

error_code Start(bool single_shard, const std::string& path, const StringVec& lua_scripts);
void StartInShard(EngineShard* shard);

error_code SaveBody();
Expand All @@ -173,26 +188,45 @@ class RdbSnapshot {
private:
bool started_ = false;

unique_ptr<uring::LinuxFile> file_;
LinuxWriteWrapper linux_sink_;
RdbSaver saver_;
std::unique_ptr<io::Sink> io_sink_;
std::unique_ptr<RdbSaver> saver_;
RdbTypeFreqMap freq_map_;
};

error_code RdbSnapshot::Start(const StringVec& lua_scripts) {
return saver_.SaveHeader(lua_scripts);

io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
io::Result<size_t> res = lf_->WriteSome(v, len, offset_, 0);
if (res) {
offset_ += *res;
}

return res;
}

error_code RdbSnapshot::Start(bool sharded_snapshot,
const std::string& path, const StringVec& lua_scripts) {
auto res = uring::OpenLinux(path, kRdbWriteFlags, 0666);
if (!res) {
return res.error();
}

io_sink_.reset(new LinuxWriteWrapper(res->release()));
saver_.reset(new RdbSaver(io_sink_.get(), sharded_snapshot, kRdbWriteFlags & O_DIRECT));

return saver_->SaveHeader(lua_scripts);
}

error_code RdbSnapshot::SaveBody() {
return saver_.SaveBody(&freq_map_);
return saver_->SaveBody(&freq_map_);
}

error_code RdbSnapshot::Close() {
return linux_sink_.Close();
// TODO: to solve it in a more elegant way.
return static_cast<LinuxWriteWrapper*>(io_sink_.get())->Close();
}

void RdbSnapshot::StartInShard(EngineShard* shard) {
saver_.StartSnapshotInShard(false, shard);
saver_->StartSnapshotInShard(false, shard);
started_ = true;
}

Expand Down Expand Up @@ -738,28 +772,21 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
auto cb = [&](Transaction* t, EngineShard* shard) {
fs::path shard_file = filename, abs_path = path;
ShardId sid = shard->shard_id();
error_code local_ec;

ExtendFilename(now, sid, &shard_file);
abs_path += shard_file;

VLOG(1) << "Saving to " << abs_path;
auto res = uring::OpenLinux(abs_path.generic_string(), kRdbWriteFlags, 0666);

if (res) {
snapshots[sid].reset(new RdbSnapshot{true, res.value().release()});
auto& snapshot = *snapshots[sid];
local_ec = snapshot.Start(lua_scripts);
if (!local_ec) {
snapshot.StartInShard(shard);
}
} else {
local_ec = res.error();
}

snapshots[sid].reset(new RdbSnapshot);
auto& snapshot = snapshots[sid];
error_code local_ec = snapshot->Start(true, abs_path.generic_string(), lua_scripts);
if (local_ec) {
snapshot.reset(); // Reset to make sure stages won't block on faulty snapshots.

lock_guard lk(mu);
UpdateError(local_ec, &ec);
} else {
snapshot->StartInShard(shard);
}

return OpStatus::OK;
Expand All @@ -771,15 +798,10 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er

ExtendFilename(now, -1, &filename);
path += filename;

auto res = uring::OpenLinux(path.generic_string(), kRdbWriteFlags, 0666);
if (!res) {
return res.error();
}
VLOG(1) << "Saving to " << path;

snapshots[0].reset(new RdbSnapshot{false, res.value().release()});
ec = snapshots[0]->Start(lua_scripts);
snapshots[0].reset(new RdbSnapshot);
ec = snapshots[0]->Start(false, path.generic_string(), lua_scripts);

if (!ec) {
auto cb = [&](Transaction* t, EngineShard* shard) {
Expand Down
1 change: 0 additions & 1 deletion src/server/string_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "server/error.h"
#include "server/test_utils.h"
#include "server/transaction.h"
#include "util/uring/uring_pool.h"

using namespace testing;
using namespace std;
Expand Down

0 comments on commit cf779c0

Please sign in to comment.