Skip to content

Commit

Permalink
Merge PR ceph#26541 into master
Browse files Browse the repository at this point in the history
* refs/pull/26541/head:
	msg/async: guard protocol Interceptor with preprocessor variable
	cmake: add UNIT_TESTS_BUILD preprocessor var based on WITH_TESTS cmake var
	test/msgr: remove test_msgr2 unused test
	msg/async: msgr2: resolve reconnect races
	test/msgr: msgr2 unit tests using the protocol interceptor WIP
	msg/async: Protocol interceptor infrastructure

Reviewed-by: Sage Weil <[email protected]>
  • Loading branch information
liewegas committed Feb 21, 2019
2 parents 1ed2622 + 1920d3a commit e084dd8
Show file tree
Hide file tree
Showing 9 changed files with 686 additions and 1,585 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ option(ENABLE_COVERAGE "Coverage is enabled" OFF)
option(PG_DEBUG_REFS "PG Ref debugging is enabled" OFF)

option(WITH_TESTS "enable the build of ceph-test package scripts/binaries" ON)
set("UNIT_TESTS_BUILT", ${WITH_TESTS})
set(CEPH_TEST_TIMEOUT 3600 CACHE STRING
"Maximum time before a CTest gets killed" )

Expand Down
3 changes: 3 additions & 0 deletions src/include/config-h.in.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -351,4 +351,7 @@
/* Define if seastar is available. */
#cmakedefine HAVE_SEASTAR

/* Define if unit tests are built. */
#cmakedefine UNIT_TESTS_BUILT

#endif /* CONFIG_H */
8 changes: 8 additions & 0 deletions src/msg/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@

class Messenger;

#ifdef UNIT_TESTS_BUILT
class Interceptor;
#endif

struct Connection : public RefCountedObject {
mutable Mutex lock;
Messenger *msgr;
Expand All @@ -61,6 +65,10 @@ struct Connection : public RefCountedObject {
EntityName peer_name;
uint64_t peer_global_id = 0;

#ifdef UNIT_TESTS_BUILT
Interceptor *interceptor;
#endif

friend class boost::intrusive_ptr<Connection>;
friend class PipeConnection;

Expand Down
22 changes: 22 additions & 0 deletions src/msg/Messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,24 @@ class Timer;
class AuthClient;
class AuthServer;

#ifdef UNIT_TESTS_BUILT

struct Interceptor {
std::mutex lock;
std::condition_variable cond_var;

enum ACTION : uint32_t {
CONTINUE = 0,
FAIL,
STOP
};

virtual ~Interceptor() {}
virtual ACTION intercept(Connection *conn, uint32_t step) = 0;
};

#endif

class Messenger {
private:
std::deque<Dispatcher*> dispatchers;
Expand Down Expand Up @@ -76,6 +94,10 @@ class Messenger {
AuthClient *auth_client = 0;
AuthServer *auth_server = 0;

#ifdef UNIT_TESTS_BUILT
Interceptor *interceptor = nullptr;
#endif

/**
* Various Messenger conditional config/type flags to allow
* different "transport" Messengers to tune themselves
Expand Down
3 changes: 3 additions & 0 deletions src/msg/async/AsyncConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
msgr2(m2), state_offset(0),
worker(w), center(&w->center),read_buffer(nullptr)
{
#ifdef UNIT_TESTS_BUILT
this->interceptor = m->interceptor;
#endif
read_handler = new C_handle_read(this);
write_handler = new C_handle_write(this);
write_callback_handler = new C_handle_write_callback(this);
Expand Down
87 changes: 80 additions & 7 deletions src/msg/async/ProtocolV2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,23 @@ const int SIGNATURE_BLOCK_SIZE = CEPH_CRYPTO_HMACSHA256_DIGESTSIZE;

#define READB(L, B, C) read(CONTINUATION(C), L, B)

#ifdef UNIT_TESTS_BUILT

#define INTERCEPT(S) { \
if(connection->interceptor) { \
auto a = connection->interceptor->intercept(connection, (S)); \
if (a == Interceptor::ACTION::FAIL) { \
return _fault(); \
} else if (a == Interceptor::ACTION::STOP) { \
stop(); \
connection->dispatch_queue->queue_reset(connection); \
return nullptr; \
}}}

#else
#define INTERCEPT(S)
#endif

static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
// create a buffer to read into that matches the data alignment
unsigned alloc_len = 0;
Expand Down Expand Up @@ -1254,6 +1271,8 @@ CtPtr ProtocolV2::_banner_exchange(CtPtr callback) {
encode((uint16_t)banner_payload.length(), bl, 0);
bl.claim_append(banner_payload);

INTERCEPT(state == CONNECTING ? 3 : 4);

return WRITE(bl, "banner", _wait_for_peer_banner);
}

Expand Down Expand Up @@ -1299,6 +1318,9 @@ CtPtr ProtocolV2::_handle_peer_banner(char *buffer, int r) {
// temp_buffer size as well

next_payload_len = payload_len;

INTERCEPT(state == CONNECTING ? 5 : 6);

return READ(next_payload_len, _handle_peer_banner_payload);
}

Expand Down Expand Up @@ -1358,6 +1380,9 @@ CtPtr ProtocolV2::_handle_peer_banner_payload(char *buffer, int r) {
}

HelloFrame hello(this, messenger->get_mytype(), connection->target_addr);

INTERCEPT(state == CONNECTING ? 7 : 8);

return WRITE(hello.get_buffer(), "hello frame", read_frame);
}

Expand Down Expand Up @@ -1556,6 +1581,8 @@ CtPtr ProtocolV2::ready() {
<< server_cookie << std::dec << " in_seq=" << in_seq
<< " out_seq=" << out_seq << dendl;

INTERCEPT(15);

return CONTINUE(read_frame);
}

Expand Down Expand Up @@ -1605,6 +1632,8 @@ CtPtr ProtocolV2::handle_message_header(char *buffer, int r) {
}
}

INTERCEPT(16);

// Reset state
data_buf.clear();
front.clear();
Expand Down Expand Up @@ -1913,6 +1942,8 @@ CtPtr ProtocolV2::handle_message_complete() {
return _fault();
}

INTERCEPT(17);

message->set_byte_throttler(connection->policy.throttler_bytes);
message->set_message_throttler(connection->policy.throttler_messages);

Expand Down Expand Up @@ -2083,6 +2114,9 @@ CtPtr ProtocolV2::handle_message_ack(char *payload, uint32_t length) {

CtPtr ProtocolV2::start_client_banner_exchange() {
ldout(cct, 20) << __func__ << dendl;

INTERCEPT(1);

state = CONNECTING;

global_seq = messenger->get_global_seq();
Expand Down Expand Up @@ -2122,6 +2156,9 @@ CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
connection->dispatch_queue->queue_reset(connection);
return nullptr;
}

INTERCEPT(9);

AuthRequestFrame frame(auth_meta->auth_method, preferred_modes, bl);
return WRITE(frame.get_buffer(), "auth request", read_frame);
}
Expand Down Expand Up @@ -2279,8 +2316,9 @@ CtPtr ProtocolV2::send_client_ident() {
<< " flags=" << flags
<< " cookie=" << client_cookie << std::dec << dendl;

bufferlist &bl = client_ident.get_buffer();
return WRITE(bl, "client ident", read_frame);
INTERCEPT(11);

return WRITE(client_ident.get_buffer(), "client ident", read_frame);
}

CtPtr ProtocolV2::send_reconnect() {
Expand All @@ -2298,6 +2336,9 @@ CtPtr ProtocolV2::send_reconnect() {
<< server_cookie << std::dec
<< " gs=" << global_seq << " cs=" << connect_seq
<< " ms=" << in_seq << dendl;

INTERCEPT(13);

return WRITE(reconnect.get_buffer(), "reconnect", read_frame);
}

Expand Down Expand Up @@ -2441,6 +2482,9 @@ CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) {

CtPtr ProtocolV2::start_server_banner_exchange() {
ldout(cct, 20) << __func__ << dendl;

INTERCEPT(2);

state = ACCEPTING;

return _banner_exchange(CONTINUATION(post_server_banner_exchange));
Expand Down Expand Up @@ -2521,6 +2565,8 @@ CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
return _fault();
}
if (r == 1) {
INTERCEPT(10);

AuthDoneFrame auth_done(connection->peer_global_id, auth_meta->con_mode,
reply);
return WRITE(auth_done.get_buffer(), "auth done", read_frame);
Expand Down Expand Up @@ -2736,10 +2782,13 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) {
<< " cgs=" << reconnect.global_seq()
<< ", ask client to retry global" << dendl;
RetryGlobalFrame retry(this, exproto->peer_global_seq);

INTERCEPT(18);

return WRITE(retry.get_buffer(), "session retry", read_frame);
}

if (exproto->connect_seq >= reconnect.connect_seq()) {
if (exproto->connect_seq > reconnect.connect_seq()) {
ldout(cct, 5) << __func__
<< " stale connect_seq scs=" << exproto->connect_seq
<< " ccs=" << reconnect.connect_seq()
Expand All @@ -2748,6 +2797,28 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) {
return WRITE(retry.get_buffer(), "session retry", read_frame);
}

if (exproto->connect_seq == reconnect.connect_seq()) {
// reconnect race: both peers are sending reconnect messages
if (existing->peer_addrs->msgr2_addr() >
messenger->get_myaddrs().msgr2_addr() &&
!existing->policy.server) {
// the existing connection wins
ldout(cct, 1)
<< __func__
<< " reconnect race detected, this connection loses to existing="
<< existing << dendl;

WaitFrame wait;
return WRITE(wait.get_buffer(), "wait", read_frame);
} else {
// this connection wins
ldout(cct, 1) << __func__
<< " reconnect race detected, replacing existing="
<< existing << " socket by this connection's socket"
<< dendl;
}
}

ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl;

reconnecting = true;
Expand Down Expand Up @@ -3044,8 +3115,9 @@ CtPtr ProtocolV2::send_server_ident() {
connection->dispatch_queue->queue_accept(connection);
messenger->ms_deliver_handle_fast_accept(connection);

bufferlist &bl = server_ident.get_buffer();
return WRITE(bl, "server ident", server_ready);
INTERCEPT(12);

return WRITE(server_ident.get_buffer(), "server ident", server_ready);
}

CtPtr ProtocolV2::server_ready() {
Expand Down Expand Up @@ -3098,6 +3170,7 @@ CtPtr ProtocolV2::send_reconnect_ok() {
connection->dispatch_queue->queue_accept(connection);
messenger->ms_deliver_handle_fast_accept(connection);

bufferlist &bl = reconnect_ok.get_buffer();
return WRITE(bl, "reconnect ok", server_ready);
INTERCEPT(14);

return WRITE(reconnect_ok.get_buffer(), "reconnect ok", server_ready);
}
7 changes: 0 additions & 7 deletions src/test/msgr/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ add_executable(ceph_test_msgr
)
target_link_libraries(ceph_test_msgr os global ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS} ${UNITTEST_LIBS})

# ceph_test_msgr2
add_executable(ceph_test_msgr2
test_msgr2.cc
)
target_link_libraries(ceph_test_msgr2 os global ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS} ${UNITTEST_LIBS})

# ceph_test_async_networkstack
add_executable(ceph_test_async_networkstack
test_async_networkstack.cc
Expand Down Expand Up @@ -46,7 +40,6 @@ endif(HAVE_DPDK)
install(TARGETS
ceph_test_async_driver
ceph_test_msgr
ceph_test_msgr2
ceph_test_async_networkstack
ceph_perf_msgr_server
ceph_perf_msgr_client
Expand Down
Loading

0 comments on commit e084dd8

Please sign in to comment.