Skip to content

Commit 695a35c

Browse files
committed
Implement grpc timeout
1 parent a6ccc96 commit 695a35c

9 files changed

+94
-22
lines changed

src/brpc/channel.cpp

+7-7
Original file line numberDiff line numberDiff line change
@@ -493,12 +493,12 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
493493
// Setup timer for backup request. When it occurs, we'll setup a
494494
// timer of timeout_ms before sending backup request.
495495

496-
// _abstime_us is for truncating _connect_timeout_ms and resetting
496+
// _abstime_ns is for truncating _connect_timeout_ms and resetting
497497
// timer when EBACKUPREQUEST occurs.
498498
if (cntl->timeout_ms() < 0) {
499-
cntl->_abstime_us = -1;
499+
cntl->_abstime_ns = -1;
500500
} else {
501-
cntl->_abstime_us = cntl->timeout_ms() * 1000L + start_send_real_us;
501+
cntl->_abstime_ns = cntl->timeout_ms() * 1000000L + start_send_real_us * 1000L;
502502
}
503503
const int rc = bthread_timer_add(
504504
&cntl->_timeout_id,
@@ -512,18 +512,18 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
512512
} else if (cntl->timeout_ms() >= 0) {
513513
// Setup timer for RPC timetout
514514

515-
// _abstime_us is for truncating _connect_timeout_ms
516-
cntl->_abstime_us = cntl->timeout_ms() * 1000L + start_send_real_us;
515+
// _abstime_ns is for truncating _connect_timeout_ms
516+
cntl->_abstime_ns = cntl->timeout_ms() * 1000000L + start_send_real_us * 1000L;
517517
const int rc = bthread_timer_add(
518518
&cntl->_timeout_id,
519-
butil::microseconds_to_timespec(cntl->_abstime_us),
519+
butil::nanoseconds_to_timespec(cntl->_abstime_ns),
520520
HandleTimeout, (void*)correlation_id.value);
521521
if (BAIDU_UNLIKELY(rc != 0)) {
522522
cntl->SetFailed(rc, "Fail to add timer for timeout");
523523
return cntl->HandleSendFailed();
524524
}
525525
} else {
526-
cntl->_abstime_us = -1;
526+
cntl->_abstime_ns = -1;
527527
}
528528

529529
cntl->IssueRPC(start_send_real_us);

src/brpc/controller.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ void Controller::ResetPods() {
222222
_timeout_ms = UNSET_MAGIC_NUM;
223223
_backup_request_ms = UNSET_MAGIC_NUM;
224224
_connect_timeout_ms = UNSET_MAGIC_NUM;
225-
_abstime_us = -1;
225+
_abstime_ns = -1;
226226
_timeout_id = 0;
227227
_begin_time_us = 0;
228228
_end_time_us = 0;
@@ -568,7 +568,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
568568
if (timeout_ms() >= 0) {
569569
rc = bthread_timer_add(
570570
&_timeout_id,
571-
butil::microseconds_to_timespec(_abstime_us),
571+
butil::nanoseconds_to_timespec(_abstime_ns),
572572
HandleTimeout, (void*)_correlation_id.value);
573573
}
574574
if (rc != 0) {
@@ -1111,10 +1111,10 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
11111111
timespec connect_abstime;
11121112
timespec* pabstime = NULL;
11131113
if (_connect_timeout_ms > 0) {
1114-
if (_abstime_us >= 0) {
1115-
connect_abstime = butil::microseconds_to_timespec(
1116-
std::min(_connect_timeout_ms * 1000L + start_realtime_us,
1117-
_abstime_us));
1114+
if (_abstime_ns >= 0) {
1115+
connect_abstime = butil::nanoseconds_to_timespec(
1116+
std::min(_connect_timeout_ms * 1000000L + start_realtime_us * 1000L,
1117+
_abstime_ns));
11181118
} else {
11191119
connect_abstime = butil::microseconds_to_timespec(
11201120
_connect_timeout_ms * 1000L + start_realtime_us);

src/brpc/controller.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,10 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
480480
// Get sock option. .e.g get vip info through ttm kernel module hook,
481481
int GetSockOption(int level, int optname, void* optval, socklen_t* optlen);
482482

483+
// Get deadline of this RPC (since the Epoch in nanoseconds).
484+
// -1 means no deadline.
485+
int64_t deadline_ns() const { return _abstime_ns; }
486+
483487
private:
484488
struct CompletionInfo {
485489
CallId id; // call_id of the corresponding request
@@ -662,8 +666,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
662666
int32_t _timeout_ms;
663667
int32_t _connect_timeout_ms;
664668
int32_t _backup_request_ms;
665-
// Deadline of this RPC (since the Epoch in microseconds).
666-
int64_t _abstime_us;
669+
// Deadline of this RPC (since the Epoch in nanoseconds).
670+
int64_t _abstime_ns;
667671
// Timer registered to trigger RPC timeout event
668672
bthread_timer_t _timeout_id;
669673

src/brpc/details/controller_private_accessor.h

+4
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ class ControllerPrivateAccessor {
128128
std::string& protocol_param() { return _cntl->protocol_param(); }
129129
const std::string& protocol_param() const { return _cntl->protocol_param(); }
130130

131+
void set_deadline_ns(int64_t timeout_ns) {
132+
_cntl->_abstime_ns = butil::gettimeofday_us() * 1000L + timeout_ns;
133+
}
134+
131135
private:
132136
Controller* _cntl;
133137
};

src/brpc/parallel_channel.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -658,18 +658,18 @@ void ParallelChannel::CallMethod(
658658
cntl->set_timeout_ms(_options.timeout_ms);
659659
}
660660
if (cntl->timeout_ms() >= 0) {
661-
cntl->_abstime_us = cntl->timeout_ms() * 1000L + cntl->_begin_time_us;
661+
cntl->_abstime_ns = cntl->timeout_ms() * 1000000L + cntl->_begin_time_us * 1000L;
662662
// Setup timer for RPC timetout
663663
const int rc = bthread_timer_add(
664664
&cntl->_timeout_id,
665-
butil::microseconds_to_timespec(cntl->_abstime_us),
665+
butil::nanoseconds_to_timespec(cntl->_abstime_ns),
666666
HandleTimeout, (void*)cid.value);
667667
if (rc != 0) {
668668
cntl->SetFailed(rc, "Fail to add timer");
669669
goto FAIL;
670670
}
671671
} else {
672-
cntl->_abstime_us = -1;
672+
cntl->_abstime_ns = -1;
673673
}
674674
d->SaveThreadInfoOfCallsite();
675675
CHECK_EQ(0, bthread_id_unlock(cid));

src/brpc/policy/http_rpc_protocol.cpp

+36-1
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ CommonStrings::CommonStrings()
135135
, GRPC_ACCEPT_ENCODING_VALUE("identity,gzip")
136136
, GRPC_STATUS("grpc-status")
137137
, GRPC_MESSAGE("grpc-message")
138+
, GRPC_TIMEOUT("grpc-timeout")
138139
{}
139140

140141
static CommonStrings* common = NULL;
@@ -1190,6 +1191,31 @@ void EndRunningCallMethodInPool(
11901191
::google::protobuf::Message* response,
11911192
::google::protobuf::Closure* done);
11921193

1194+
static int64_t ConvertGrpcTimeoutToNS(int64_t timeout_value, const char timeout_unit) {
1195+
switch (timeout_unit) {
1196+
case 'H':
1197+
timeout_value *= (3600 * 1000000000L);
1198+
break;
1199+
case 'M':
1200+
timeout_value *= (60 * 1000000000L);
1201+
break;
1202+
case 'S':
1203+
timeout_value *= 1000000000L;
1204+
break;
1205+
case 'm':
1206+
timeout_value *= 1000000L;
1207+
break;
1208+
case 'u':
1209+
timeout_value *= 1000L;
1210+
case 'n':
1211+
break;
1212+
default:
1213+
return -1;
1214+
}
1215+
return timeout_value;
1216+
}
1217+
1218+
11931219
void ProcessHttpRequest(InputMessageBase *msg) {
11941220
const int64_t start_parse_us = butil::cpuwide_time_us();
11951221
DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));
@@ -1418,6 +1444,15 @@ void ProcessHttpRequest(InputMessageBase *msg) {
14181444
return;
14191445
}
14201446
}
1447+
const std::string* grpc_timeout = req_header.GetHeader(common->GRPC_TIMEOUT);
1448+
if (grpc_timeout) {
1449+
const char timeout_unit = grpc_timeout->back();
1450+
int64_t timeout_value_ns =
1451+
ConvertGrpcTimeoutToNS((int64_t)strtol(grpc_timeout->data(), NULL, 10), timeout_unit);
1452+
if (timeout_value_ns >= 0) {
1453+
accessor.set_deadline_ns(timeout_value_ns);
1454+
}
1455+
}
14211456
}
14221457
} else {
14231458
encoding = req_header.GetHeader(common->CONTENT_ENCODING);
@@ -1455,7 +1490,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
14551490
// A http server, just keep content as it is.
14561491
cntl->request_attachment().swap(req_body);
14571492
}
1458-
1493+
14591494
google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
14601495
imsg_guard.reset(); // optional, just release resourse ASAP
14611496

src/brpc/policy/http_rpc_protocol.h

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ struct CommonStrings {
7171
std::string GRPC_ACCEPT_ENCODING_VALUE;
7272
std::string GRPC_STATUS;
7373
std::string GRPC_MESSAGE;
74+
std::string GRPC_TIMEOUT;
7475

7576
CommonStrings();
7677
};

test/brpc_grpc_protocol_unittest.cpp

+30-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "brpc/server.h"
2121
#include "brpc/channel.h"
2222
#include "brpc/grpc.h"
23+
#include "butil/time.h"
2324
#include "grpc.pb.h"
2425

2526
int main(int argc, char* argv[]) {
@@ -64,6 +65,10 @@ class MyGrpcService : public ::test::GrpcService {
6465
cntl->SetFailed(brpc::EINTERNAL, "%s", g_prefix.c_str());
6566
return;
6667
}
68+
if (req->has_timeout_ns()) {
69+
EXPECT_NEAR(cntl->deadline_ns() / 1000000000L,
70+
butil::gettimeofday_s() + req->timeout_ns() / 1000000000L, 1);
71+
}
6772
}
6873

6974
void MethodTimeOut(::google::protobuf::RpcController* cntl_base,
@@ -77,7 +82,6 @@ class MyGrpcService : public ::test::GrpcService {
7782
}
7883
};
7984

80-
8185
class GrpcTest : public ::testing::Test {
8286
protected:
8387
GrpcTest() {
@@ -198,4 +202,29 @@ TEST_F(GrpcTest, MethodNotExist) {
198202
ASSERT_TRUE(butil::StringPiece(cntl.ErrorText()).ends_with("Method MethodNotExist() not implemented."));
199203
}
200204

205+
TEST_F(GrpcTest, GrpcTimeOut) {
206+
const char* timeouts[] = {
207+
"2H", "7200000000000",
208+
"3M", "180000000000",
209+
"+1S", "1000000000",
210+
"4m", "4000000",
211+
"5u", "5000",
212+
"6n", "6"
213+
};
214+
215+
for (size_t i = 0; i < arraysize(timeouts); i = i + 2) {
216+
test::GrpcRequest req;
217+
test::GrpcResponse res;
218+
brpc::Controller cntl;
219+
req.set_message(g_req);
220+
req.set_gzip(false);
221+
req.set_return_error(false);
222+
req.set_timeout_ns((int64_t)(strtol(timeouts[i+1], NULL, 10)));
223+
cntl.http_request().SetHeader("grpc-timeout", timeouts[i]);
224+
test::GrpcService_Stub stub(&_channel);
225+
stub.Method(&cntl, &req, &res, NULL);
226+
EXPECT_FALSE(cntl.Failed());
227+
}
228+
}
229+
201230
} // namespace

test/grpc.proto

+1-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ message GrpcRequest {
77
required string message = 1;
88
required bool gzip = 2;
99
required bool return_error = 3;
10+
optional int64 timeout_ns = 4;
1011
};
1112

1213
message GrpcResponse {
@@ -18,5 +19,3 @@ service GrpcService {
1819
rpc MethodTimeOut(GrpcRequest) returns (GrpcResponse);
1920
rpc MethodNotExist(GrpcRequest) returns (GrpcResponse);
2021
}
21-
22-

0 commit comments

Comments
 (0)