Skip to content

Commit

Permalink
curvebs(client): add slow request metric
Browse files Browse the repository at this point in the history
Signed-off-by: Hanqing Wu <[email protected]>
  • Loading branch information
wu-hanqing committed Sep 20, 2023
1 parent a7ccaf9 commit 8adae43
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 64 deletions.
7 changes: 7 additions & 0 deletions conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,17 @@ chunkserver.serverStableThreshold=3
# 当一个请求重试次数超过这个值时,其超时时间一定进入指数退避
chunkserver.minRetryTimesForceTimeoutBackoff=5

# ** 已废弃,不再使用,请使用 `chunkserver.slowRequestThresholdMS` **
# ** dreprecated, use `chunkserver.slowRequestThresholdMS` instead **
# 当一个rpc重试超过次数maxRetryTimesBeforeConsiderSuspend的时候
# 记为悬挂IO,metric会报警
chunkserver.maxRetryTimesBeforeConsiderSuspend=20

# 请求重试时间超过该阈值后,会标记为slow request
# specifies the retry duration threshold after which a request will be
# marked as a slow request.
chunkserver.slowRequestThresholdMS=45000

#
################# 文件级别配置项 #############
#
Expand Down
18 changes: 9 additions & 9 deletions src/client/chunk_closure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,15 +414,15 @@ void ClientClosure::OnRetry() {
return;
}

if (!reqDone_->IsSuspendRPC() && reqDone_->GetRetriedTimes() >=
failReqOpt_.chunkserverMaxRetryTimesBeforeConsiderSuspend) {
reqDone_->SetSuspendRPCFlag();
MetricHelper::IncremIOSuspendNum(fileMetric_);
LOG(ERROR) << "IO Retried "
<< failReqOpt_.chunkserverMaxRetryTimesBeforeConsiderSuspend
<< " times, set suspend flag! " << *reqCtx_
<< ", IO id = " << reqDone_->GetIOTracker()->GetID()
<< ", request id = " << reqCtx_->id_;
if (CURVE_UNLIKELY(!reqDone_->IsSlowRequest() &&
(TimeUtility::GetTimeofDayMs() - reqDone_->CreatedMS() >
failReqOpt_.chunkserverSlowRequestThresholdMS))) {
reqDone_->MarkAsSlowRequest();
MetricHelper::IncremSlowRequestNum(fileMetric_);
LOG(ERROR) << "Slow request, " << *reqCtx_
<< ", IO id = " << reqDone_->GetIOTracker()->GetID()
<< ", request id = " << reqCtx_->id_
<< ", request created at " << reqDone_->CreatedMS();
}

PreProcessBeforeRetry(status_, cntlstatus_);
Expand Down
13 changes: 10 additions & 3 deletions src/client/client_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,16 @@ int ClientConfig::Init(const std::string& configpath) {
<< "config no chunkserver.minRetryTimesForceTimeoutBackoff "
<< "using default value";

ret = conf_.GetUInt64Value("chunkserver.maxRetryTimesBeforeConsiderSuspend",
&fileServiceOption_.ioOpt.ioSenderOpt.failRequestOpt.chunkserverMaxRetryTimesBeforeConsiderSuspend); // NOLINT
LOG_IF(ERROR, ret == false) << "config no chunkserver.maxRetryTimesBeforeConsiderSuspend info"; // NOLINT
constexpr const char* kChunkserverSlowRequestThresholdMS =
"chunkserver.slowRequestThresholdMS";
ret = conf_.GetUInt32Value(
kChunkserverSlowRequestThresholdMS,
&fileServiceOption_.ioOpt.ioSenderOpt.failRequestOpt
.chunkserverSlowRequestThresholdMS);
LOG_IF(WARNING, !ret) << "config no `" << kChunkserverSlowRequestThresholdMS
<< "`, use default value "
<< fileServiceOption_.ioOpt.ioSenderOpt.failRequestOpt
.chunkserverSlowRequestThresholdMS;

ret = conf_.GetUInt64Value("global.fileMaxInFlightRPCNum",
&fileServiceOption_.ioOpt.ioSenderOpt.inflightOpt.fileMaxInFlightRPCNum); // NOLINT
Expand Down
29 changes: 14 additions & 15 deletions src/client/client_metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,11 @@ inline void GetStringValue(std::ostream& os, void* arg) {
os << *static_cast<std::string*>(arg);
}

// 悬挂IO统计,文件级别统计,方便定位
struct IOSuspendMetric {
// 当前persecond计数总数
struct SlowRequestMetric {
bvar::Adder<uint64_t> count;

IOSuspendMetric(const std::string& prefix, const std::string& name)
: count(prefix, name + "_total_count") {}
SlowRequestMetric(const std::string& prefix, const std::string& name)
: count(prefix, name + "_total") {}
};

// 秒级信息统计
Expand Down Expand Up @@ -131,8 +129,8 @@ struct FileMetric {
// get leader失败重试qps
PerSecondMetric getLeaderRetryQPS;

// 当前文件上的悬挂IO数量
IOSuspendMetric suspendRPCMetric;
// Number of slow requests
SlowRequestMetric slowRequestMetric;

DiscardMetric discardMetric;

Expand All @@ -141,14 +139,15 @@ struct FileMetric {
inflightRPCNum(prefix, filename + "_inflight_rpc_num"),
readSizeRecorder(prefix, filename + "_read_request_size_recoder"),
writeSizeRecorder(prefix, filename + "_write_request_size_recoder"),
discardSizeRecorder(prefix, filename + "_discard_request_size_recoder"), // NOLINT
discardSizeRecorder(prefix,
filename + "_discard_request_size_recoder"),
readRPC(prefix, filename + "_read_rpc"),
writeRPC(prefix, filename + "_write_rpc"),
userRead(prefix, filename + "_read"),
userWrite(prefix, filename + "_write"),
userDiscard(prefix, filename + "_discard"),
getLeaderRetryQPS(prefix, filename + "_get_leader_retry_rpc"),
suspendRPCMetric(prefix, filename + "_suspend_io_num"),
slowRequestMetric(prefix, filename + "_slow_request"),
discardMetric(prefix + filename) {}
};

Expand Down Expand Up @@ -487,17 +486,17 @@ class MetricHelper {
}
}

static void IncremIOSuspendNum(FileMetric* fm) {
static void IncremSlowRequestNum(FileMetric* fm) {
if (fm != nullptr) {
fm->suspendRPCMetric.count << 1;
fm->slowRequestMetric.count << 1;
}
}

static void DecremIOSuspendNum(FileMetric* fm) {
static void DecremSlowRequestNum(FileMetric* fm) {
if (fm != nullptr) {
fm->suspendRPCMetric.count.get_value() > 0
? fm->suspendRPCMetric.count << -1
: fm->suspendRPCMetric.count << 0;
fm->slowRequestMetric.count.get_value() > 0
? fm->slowRequestMetric.count << -1
: fm->slowRequestMetric.count << 0;
}
}
};
Expand Down
11 changes: 5 additions & 6 deletions src/client/config_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,6 @@ struct ChunkServerUnstableOption {
* copyset 标记为unstable,促使其下次发送rpc前,先去getleader。
* @chunkserverMinRetryTimesForceTimeoutBackoff:
* 当一个请求重试次数超过阈值时,还在重试 使其超时时间进行指数退避
* @chunkserverMaxRetryTimesBeforeConsiderSuspend:
* rpc重试超过这个次数后被认为是悬挂IO,
* 因为发往chunkserver底层的rpc重试次数非常大,如果一个rpc连续
* 失败超过该阈值的时候,可以认为当前IO处于悬挂状态,通过metric
* 向上报警。
*/
struct FailureRequestOption {
uint32_t chunkserverOPMaxRetry = 3;
Expand All @@ -151,7 +146,11 @@ struct FailureRequestOption {
uint64_t chunkserverMaxRPCTimeoutMS = 64000;
uint64_t chunkserverMaxRetrySleepIntervalUS = 64ull * 1000 * 1000;
uint64_t chunkserverMinRetryTimesForceTimeoutBackoff = 5;
uint64_t chunkserverMaxRetryTimesBeforeConsiderSuspend = 20;

// When a request remains outstanding beyond this threshold, it is marked as
// a slow request.
// Default: 45s
uint32_t chunkserverSlowRequestThresholdMS = 45 * 1000;
};

/**
Expand Down
5 changes: 3 additions & 2 deletions src/client/request_closure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "src/client/request_closure.h"

#include "include/curve_compiler_specific.h"
#include "src/client/io_tracker.h"
#include "src/client/iomanager.h"
#include "src/client/request_context.h"
Expand All @@ -31,8 +32,8 @@ namespace client {

void RequestClosure::Run() {
ReleaseInflightRPCToken();
if (suspendRPC_) {
MetricHelper::DecremIOSuspendNum(metric_);
if (CURVE_UNLIKELY(slowRequest_)) {
MetricHelper::DecremSlowRequestNum(metric_);
}
tracker_->HandleResponse(reqCtx_);
}
Expand Down
20 changes: 9 additions & 11 deletions src/client/request_closure.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "src/client/client_metric.h"
#include "src/client/inflight_controller.h"
#include "src/common/concurrent/concurrent.h"
#include "src/common/timeutility.h"

namespace curve {
namespace client {
Expand Down Expand Up @@ -140,20 +141,14 @@ class CURVE_CACHELINE_ALIGNMENT RequestClosure
nextTimeoutMS_ = timeout;
}

/**
* 设置当前的IO为悬挂IO
*/
void SetSuspendRPCFlag() {
suspendRPC_ = true;
}
bool IsSlowRequest() const { return slowRequest_; }

bool IsSuspendRPC() const {
return suspendRPC_;
}
void MarkAsSlowRequest() { slowRequest_ = true; }

uint64_t CreatedMS() const { return createdMS_; }

private:
// suspend io标志
bool suspendRPC_ = false;
bool slowRequest_ = false;

// whether own inflight count
bool ownInflight_ = false;
Expand All @@ -178,6 +173,9 @@ class CURVE_CACHELINE_ALIGNMENT RequestClosure

// 下一次rpc超时时间
uint64_t nextTimeoutMS_ = 0;

// create time of this closure(in millisecond)
uint64_t createdMS_ = common::TimeUtility::GetTimeofDayMs();
};

} // namespace client
Expand Down
4 changes: 2 additions & 2 deletions src/client/request_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ struct CURVE_CACHELINE_ALIGNMENT RequestContext {
~RequestContext() = default;

bool Init() {
done_ = new (std::nothrow) RequestClosure(this);
return done_ != nullptr;
done_ = new (std::nothrow) RequestClosure(this);
return done_ != nullptr;
}

void UnInit() {
Expand Down
33 changes: 17 additions & 16 deletions test/client/client_metric_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ void cb(CurveAioContext* ctx) {

} // namespace

TEST(MetricTest, SuspendRPC_MetricTest) {
TEST(MetricTest, SlowRequestMetricTest) {
MetaServerOption metaopt;
metaopt.rpcRetryOpt.addrs.push_back(mdsMetaServerAddr);
metaopt.rpcRetryOpt.rpcTimeoutMs = 500;
Expand Down Expand Up @@ -232,12 +232,13 @@ TEST(MetricTest, SuspendRPC_MetricTest) {
userinfo.owner = "test";

FileServiceOption opt;
opt.ioOpt.reqSchdulerOpt.
ioSenderOpt.failRequestOpt.chunkserverOPMaxRetry = 50;
opt.ioOpt.reqSchdulerOpt.
ioSenderOpt.failRequestOpt.chunkserverRPCTimeoutMS = 50;
opt.ioOpt.reqSchdulerOpt.
ioSenderOpt.failRequestOpt.chunkserverMaxRPCTimeoutMS = 50;
auto& failRequestOpt = opt.ioOpt.reqSchdulerOpt.ioSenderOpt.failRequestOpt;

// request will retry 50 times, and each request's timeout is 50ms
failRequestOpt.chunkserverOPMaxRetry = 50;
failRequestOpt.chunkserverRPCTimeoutMS = 50;
failRequestOpt.chunkserverMaxRPCTimeoutMS = 50;
failRequestOpt.chunkserverSlowRequestThresholdMS = 1500;

FileInstance fi;
ASSERT_TRUE(fi.Initialize(filename, mdsclient, userinfo, OpenFlags{}, opt));
Expand Down Expand Up @@ -284,7 +285,7 @@ TEST(MetricTest, SuspendRPC_MetricTest) {
ret = fi.Read(buffer, 0, 4096);
ASSERT_EQ(-2, ret);

ASSERT_EQ(fm->suspendRPCMetric.count.get_value(), 0);
ASSERT_EQ(fm->slowRequestMetric.count.get_value(), 0);

mds.EnableNetUnstable(100);

Expand All @@ -298,11 +299,11 @@ TEST(MetricTest, SuspendRPC_MetricTest) {
fi.AioWrite(aioctx, UserDataType::RawBuffer);

std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_EQ(fm->suspendRPCMetric.count.get_value(), 1);
ASSERT_EQ(fm->slowRequestMetric.count.get_value(), 1);

{
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, [](){return flag;});
cv.wait(lk, []() { return flag; });
}

delete[] buffer;
Expand Down Expand Up @@ -333,8 +334,8 @@ TEST(MetricTest, MetricHelperTest) {
ASSERT_NO_THROW(MetricHelper::UserLatencyRecord(fm, 0, OpType::READ));
ASSERT_NO_THROW(MetricHelper::IncremInflightRPC(fm));
ASSERT_NO_THROW(MetricHelper::DecremInflightRPC(fm));
ASSERT_NO_THROW(MetricHelper::IncremIOSuspendNum(fm));
ASSERT_NO_THROW(MetricHelper::DecremIOSuspendNum(fm));
ASSERT_NO_THROW(MetricHelper::IncremSlowRequestNum(fm));
ASSERT_NO_THROW(MetricHelper::DecremSlowRequestNum(fm));
ASSERT_NO_THROW(MetricHelper::LatencyRecord(fm, 0, OpType::READ));
ASSERT_NO_THROW(MetricHelper::IncremRedirectRPCCount(fm, OpType::READ));
ASSERT_NO_THROW(MetricHelper::IncremRedirectRPCCount(fm, OpType::WRITE));
Expand All @@ -360,8 +361,8 @@ TEST(MetricTest, MetricHelperTest) {
ASSERT_NO_THROW(MetricHelper::UserLatencyRecord(&fm2, 0, OpType::READ));
ASSERT_NO_THROW(MetricHelper::IncremInflightRPC(&fm2));
ASSERT_NO_THROW(MetricHelper::DecremInflightRPC(&fm2));
ASSERT_NO_THROW(MetricHelper::IncremIOSuspendNum(&fm2));
ASSERT_NO_THROW(MetricHelper::DecremIOSuspendNum(&fm2));
ASSERT_NO_THROW(MetricHelper::IncremSlowRequestNum(&fm2));
ASSERT_NO_THROW(MetricHelper::DecremSlowRequestNum(&fm2));
ASSERT_NO_THROW(MetricHelper::LatencyRecord(&fm2, 0, OpType::READ));

ASSERT_NO_THROW(MetricHelper::IncremRedirectRPCCount(&fm2, OpType::READ));
Expand All @@ -378,8 +379,8 @@ TEST(MetricTest, MetricHelperTest) {
ASSERT_NO_THROW(MetricHelper::LatencyRecord(nullptr, 0, OpType::READ));
ASSERT_NO_THROW(MetricHelper::IncremInflightRPC(nullptr));
ASSERT_NO_THROW(MetricHelper::DecremInflightRPC(nullptr));
ASSERT_NO_THROW(MetricHelper::IncremIOSuspendNum(nullptr));
ASSERT_NO_THROW(MetricHelper::DecremIOSuspendNum(nullptr));
ASSERT_NO_THROW(MetricHelper::IncremSlowRequestNum(nullptr));
ASSERT_NO_THROW(MetricHelper::IncremSlowRequestNum(nullptr));
}

} // namespace client
Expand Down

0 comments on commit 8adae43

Please sign in to comment.