Skip to content

Commit

Permalink
Merge branch 'fixDeleteRange' into 'master' (merge request !580)
Browse files Browse the repository at this point in the history
[FIX] fix bugs in truncateBinlogV2 #392
### MR描述
<!--- 详细描述MR的细节 -->

close #392 

- 修复truncatebinlog时,选取了错误的binlog 进行了判断,致使binlog无法回收的问题。
- 暂时删除deleteFilesInRange日志。

### 修改动机和上下文背景
<!--- 为什么需要此修改, 解决了什么问题 -->
<!---如果解决了相关的#issue, 在此处进行关联(#issue, close #issue) -->



### 此MR如何进行测试 ?
<!--- 请描述测试MR的细节 -->
<!--- 包括测试的环境以及执行的测试用例 -->
<!--- 说明 change 如何影响其他部分的代码 etc. -->

### change 类型
<!---你的代码引入了何种类型的change, 在所有关联的复选框前选择"x" -->
- [ ] Bug fix (修复了issue的非侵入式修改)
- [ ] New feature (增加功能的非侵入式修改)
- [ ] Breaking change (修复或者增加特性, 但是会造成现有行为的非预期行为)

### 清单
<!--- 查看下述选项,并进行"x"勾选 -->
<!--- 如果你对所有都不确定, 请随时咨询我们 -->
- [ ] 遵循项目的Code-Style
- [ ] Change 需要文档的修改
- [ ] 我已经进行相关文档的修改
- [ ] 我的MR已经通过的相关流水线测试
  • Loading branch information
takenliu committed Feb 27, 2023
2 parents ab04b4c + 1de9431 commit b70c460
Show file tree
Hide file tree
Showing 16 changed files with 228 additions and 247 deletions.
58 changes: 52 additions & 6 deletions src/tendisplus/commands/debug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,42 @@ class BinlogMetaCommand : public Command {
}
} binlogmetaCommand;

class RecycleStatusCommand : public Command {
public:
RecycleStatusCommand() : Command("recyclestatus", "as") {}

ssize_t arity() const {
return 2;
}

int32_t firstkey() const {
return 1;
}

int32_t lastkey() const {
return 1;
}

int32_t keystep() const {
return 1;
}

Expected<std::string> run(Session* sess) final {
const auto& args = sess->getArgs();
auto storeId = ::tendisplus::stoul(args[1]);
if (!storeId.ok()) {
return storeId.status();
}

auto server = sess->getServerEntry();
if (storeId.value() >= server->getKVStoreCount()) {
return {ErrorCodes::ERR_PARSEOPT, "invalid instance num"};
}

return server->getReplManager()->getRecycleStatus(storeId.value());
}
} recyclestatusCommand;

class BinlogFlushCommand : public Command {
public:
BinlogFlushCommand() : Command("binlogflush", "as") {}
Expand Down Expand Up @@ -3784,6 +3820,7 @@ class reshapeCommand : public Command {
if (!status.ok()) {
return status;
}
LOG(INFO) << "full compact on store:" << storeid << " succ.";
} else {
server->getCompactionStat().isRunning = true;
server->getCompactionStat().startTime = sinceEpoch();
Expand All @@ -3802,6 +3839,7 @@ class reshapeCommand : public Command {
if (!status.ok()) {
return status;
}
LOG(INFO) << "full compact on store:" << i << " succ.";
}
}
return Command::fmtOK();
Expand Down Expand Up @@ -3858,19 +3896,23 @@ class compactRangeCommand : public Command {
args[3] + " must be bigger than " + args[2]};
}

std::string str_start, str_end;
std::string str_start, str_end, target;
if (cf == ColumnFamilyNumber::ColumnFamily_Default) {
RecordKey tmplRk(start, 0, RecordType::RT_DATA_META, "", "");
str_start = tmplRk.prefixChunkid();

RecordKey tmplRk2(end, 0, RecordType::RT_DATA_META, "", "");
str_end = tmplRk2.prefixChunkid();

target = "slots";
} else {
ReplLogKeyV2 tmplRk(start);
str_start = tmplRk.encode();

ReplLogKeyV2 tmplRk2(end);
str_end = tmplRk2.encode();

target = "binlogs";
}

uint64_t i = 0;
Expand All @@ -3893,6 +3935,8 @@ class compactRangeCommand : public Command {
if (!status.ok()) {
return status;
}
LOG(INFO) << "compactRange on store:" << i << " " << target << ": ["
<< start << ", " << end << ")";
}
return Command::fmtOK();
}
Expand Down Expand Up @@ -4013,6 +4057,8 @@ class deleteFilesInRangeGenericCommand : public Command {
RET_IF_ERR_EXPECTED(expDb);
auto s = expDb.value().store->deleteFilesInRangeWithoutBinlog(
ColumnFamilyNumber::ColumnFamily_Default, rkStart, rkEnd);
LOG(INFO) << "deleteFilesInRange on store:" << i << " slots: [" << start
<< ", " << end << "]";
RET_IF_ERR(s);
i++;
}
Expand Down Expand Up @@ -4063,9 +4109,11 @@ class deleteFilesInRangeGenericCommand : public Command {

// binlog in [start, end]
auto rlkStart = ReplLogKeyV2(start).encode();
auto rlkEnd = ReplLogKeyV2(end).encode();
auto rlkEnd = ReplLogKeyV2(end + 1).encode();
auto ret = kvStore->deleteFilesInRangeWithoutBinlog(
ColumnFamilyNumber::ColumnFamily_Binlog, rlkStart, rlkEnd);
LOG(INFO) << "deleteFilesInRange on store:" << storeID << " binlogs: ["
<< start << ", " << end << "]";
RET_IF_ERR(ret);
return Command::fmtOK();
}
Expand Down Expand Up @@ -4227,8 +4275,6 @@ class compactSlotsCommand : public Command {
if (myBegin == UINT32_MAX) {
continue;
}
LOG(INFO) << "compactSlots storeid:" << storeid
<< " beginChunk:" << myBegin << " endChunk:" << myEnd;

auto expdb =
server->getSegmentMgr()->getDb(sess, storeid, mgl::LockMode::LOCK_IS);
Expand All @@ -4254,8 +4300,8 @@ class compactSlotsCommand : public Command {
<< " err:" << s.toString();
return s;
}
LOG(INFO) << "compactSlots end storeid:" << storeid
<< " beginChunk:" << myBegin << " endChunk:" << myEnd;
LOG(INFO) << "compactSlots on store:" << storeid << " slots: [" << myBegin
<< ", " << myEnd << "]";
}
return Command::fmtOK();
}
Expand Down
1 change: 1 addition & 0 deletions src/tendisplus/integrate_test/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func testRestore(m1_ip string, m1_port int, s1_ip string, s1_port int,

cfgArgs["maxbinlogkeepnum"] = "10000"
cfgArgs["minbinlogkeepsec"] = "60"
cfgArgs["truncateBinlogNum"] = "1"

m1_port = util.FindAvailablePort(m1_port)
log.Infof("FindAvailablePort:%d", m1_port)
Expand Down
1 change: 1 addition & 0 deletions src/tendisplus/integrate_test/restoretest.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func testRestore(m1_ip string, m1_port int, m2_ip string, m2_port int, kvstoreco
cfgArgs["kvstorecount"] = strconv.Itoa(kvstorecount)
cfgArgs["requirepass"] = "tendis+test"
cfgArgs["masterauth"] = "tendis+test"
cfgArgs["truncateBinlogNum"] = "1"

m1_port = util.FindAvailablePort(m1_port)
log.Infof("FindAvailablePort:%d", m1_port)
Expand Down
12 changes: 6 additions & 6 deletions src/tendisplus/replication/mpov.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ bool ReplManager::registerIncrSync(asio::ip::tcp::socket sock,
return false;
}
_logRecycStatus[storeId]->isRunning = true;
firstPos = _logRecycStatus[storeId]->firstBinlogId;
firstPos = _logRecycStatus[storeId]->minValidBinlogID;
lastFlushBinlogId = _logRecycStatus[storeId]->lastFlushBinlogId;
}

Expand Down Expand Up @@ -306,14 +306,14 @@ bool ReplManager::registerIncrSyncStatus(uint32_t storeId,
std::shared_ptr<BlockingTcpClient> client) {
std::lock_guard<std::mutex> lk(_mutex);
// takenliu: recycleBinlog use firstPos, and incrSync use binlogPos+1
if (_logRecycStatus[storeId]->firstBinlogId > (binlogPos + 1) &&
_logRecycStatus[storeId]->firstBinlogId !=
if (_logRecycStatus[storeId]->minValidBinlogID > (binlogPos + 1) &&
_logRecycStatus[storeId]->minValidBinlogID !=
_logRecycStatus[storeId]->lastFlushBinlogId) {
std::stringstream ss;
ss << "-ERR invalid binlogPos,storeId:" << storeId
<< ",master firstPos:" << _logRecycStatus[storeId]->firstBinlogId
<< ",slave binlogPos:" << binlogPos << ",lastFlushBinlogId:"
<< _logRecycStatus[storeId]->lastFlushBinlogId;
<< ",master firstPos:" << _logRecycStatus[storeId]->minValidBinlogID
<< ",slave binlogPos:" << binlogPos
<< ",lastFlushBinlogId:" << _logRecycStatus[storeId]->lastFlushBinlogId;
LOG(ERROR) << ss.str();
client->writeLine(ss.str());
return false;
Expand Down
Loading

0 comments on commit b70c460

Please sign in to comment.