Skip to content

Commit

Permalink
msg: crc configuration in messenger
Browse files Browse the repository at this point in the history
Add new header_crc and data_crc configuration booleans, and use
them consistently to govern whether CRC is performed in the
Message encode, decode, and transit paths.

Remove ms_nocrc, changes per Sage.
Mimimally adapt AsyncMessenger for crcflags.

Signed-off-by: Casey Bodley <[email protected]>
Signed-off-by: Matt Benjamin <[email protected]>
  • Loading branch information
cbodley authored and Matt Benjamin committed Jan 14, 2015
1 parent b677a86 commit 2ffacbe
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 37 deletions.
3 changes: 2 additions & 1 deletion src/common/config_opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ OPTION(ms_tcp_rcvbuf, OPT_INT, 0)
OPTION(ms_tcp_prefetch_max_size, OPT_INT, 4096) // max prefetch size, we limit this to avoid extra memcpy
OPTION(ms_initial_backoff, OPT_DOUBLE, .2)
OPTION(ms_max_backoff, OPT_DOUBLE, 15.0)
OPTION(ms_nocrc, OPT_BOOL, false)
OPTION(ms_crc_data, OPT_BOOL, true)
OPTION(ms_crc_header, OPT_BOOL, true)
OPTION(ms_die_on_bad_msg, OPT_BOOL, false)
OPTION(ms_die_on_unhandled_msg, OPT_BOOL, false)
OPTION(ms_die_on_old_message, OPT_BOOL, false) // assert if we get a dup incoming message and shouldn't have (may be triggered by pre-541cd3c64be0dfa04e8a2df39422e0eb9541a428 code)
Expand Down
2 changes: 1 addition & 1 deletion src/messages/MForward.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct MForward : public Message {
::decode(tid, p);
::decode(client, p);
::decode(client_caps, p);
msg = (PaxosServiceMessage *)decode_message(NULL, p);
msg = (PaxosServiceMessage *)decode_message(NULL, 0, p);
if (header.version >= 2) {
::decode(con_features, p);
} else {
Expand Down
6 changes: 3 additions & 3 deletions src/messages/MRoute.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct MRoute : public Message {
MRoute(bufferlist bl, const entity_inst_t& i)
: Message(MSG_ROUTE, HEAD_VERSION, COMPAT_VERSION), session_mon_tid(0), dest(i) {
bufferlist::iterator p = bl.begin();
msg = decode_message(NULL, p);
msg = decode_message(NULL, 0, p);
}
private:
~MRoute() {
Expand All @@ -51,9 +51,9 @@ struct MRoute : public Message {
bool m;
::decode(m, p);
if (m)
msg = decode_message(NULL, p);
msg = decode_message(NULL, 0, p);
} else {
msg = decode_message(NULL, p);
msg = decode_message(NULL, 0, p);
}
}
void encode_payload(uint64_t features) {
Expand Down
2 changes: 1 addition & 1 deletion src/mon/Monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2974,7 +2974,7 @@ void Monitor::resend_routed_requests()
RoutedRequest *rr = p->second;

bufferlist::iterator q = rr->request_bl.begin();
PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, q);
PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);

if (mon == rank) {
dout(10) << " requeue for self tid " << rr->tid << " " << *req << dendl;
Expand Down
23 changes: 14 additions & 9 deletions src/msg/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ using namespace std;

#define dout_subsys ceph_subsys_ms

void Message::encode(uint64_t features, bool datacrc)
void Message::encode(uint64_t features, int crcflags)
{
// encode and copy out of *m
if (empty_payload()) {
Expand All @@ -184,17 +184,19 @@ void Message::encode(uint64_t features, bool datacrc)
if (header.compat_version == 0)
header.compat_version = header.version;
}
calc_front_crc();
if (crcflags & MSG_CRC_HEADER)
calc_front_crc();

// update envelope
header.front_len = get_payload().length();
header.middle_len = get_middle().length();
header.data_len = get_data().length();
calc_header_crc();
if (crcflags & MSG_CRC_HEADER)
calc_header_crc();

footer.flags = CEPH_MSG_FOOTER_COMPLETE;

if (datacrc) {
if (crcflags & MSG_CRC_DATA) {
calc_data_crc();

#ifdef ENCODE_DUMP
Expand Down Expand Up @@ -246,11 +248,14 @@ void Message::dump(Formatter *f) const
f->dump_string("summary", ss.str());
}

Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_footer& footer,
bufferlist& front, bufferlist& middle, bufferlist& data)
Message *decode_message(CephContext *cct, int crcflags,
ceph_msg_header& header,
ceph_msg_footer& footer,
bufferlist& front, bufferlist& middle,
bufferlist& data)
{
// verify crc
if (!cct || !cct->_conf->ms_nocrc) {
if (crcflags & MSG_CRC_HEADER) {
__u32 front_crc = front.crc32c(0);
__u32 middle_crc = middle.crc32c(0);

Expand Down Expand Up @@ -788,7 +793,7 @@ void encode_message(Message *msg, uint64_t features, bufferlist& payload)
// We've slipped in a 0 signature at this point, so any signature checking after this will
// fail. PLR

Message *decode_message(CephContext *cct, bufferlist::iterator& p)
Message *decode_message(CephContext *cct, int crcflags, bufferlist::iterator& p)
{
ceph_msg_header h;
ceph_msg_footer_old fo;
Expand All @@ -804,6 +809,6 @@ Message *decode_message(CephContext *cct, bufferlist::iterator& p)
::decode(fr, p);
::decode(mi, p);
::decode(da, p);
return decode_message(cct, h, f, fr, mi, da);
return decode_message(cct, crcflags, h, f, fr, mi, da);
}

12 changes: 9 additions & 3 deletions src/msg/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@
#define MSG_TIMECHECK 0x600
#define MSG_MON_HEALTH 0x601

// *** Message::encode() crcflags bits ***
#define MSG_CRC_DATA 1
#define MSG_CRC_HEADER 2


// ======================================================

Expand Down Expand Up @@ -422,11 +426,12 @@ class Message : public RefCountedObject {

virtual void dump(Formatter *f) const;

void encode(uint64_t features, bool datacrc);
void encode(uint64_t features, int crcflags);
};
typedef boost::intrusive_ptr<Message> MessageRef;

extern Message *decode_message(CephContext *cct, ceph_msg_header &header,
extern Message *decode_message(CephContext *cct, int crcflags,
ceph_msg_header &header,
ceph_msg_footer& footer, bufferlist& front,
bufferlist& middle, bufferlist& data);
inline ostream& operator<<(ostream& out, Message& m) {
Expand All @@ -437,6 +442,7 @@ inline ostream& operator<<(ostream& out, Message& m) {
}

extern void encode_message(Message *m, uint64_t features, bufferlist& bl);
extern Message *decode_message(CephContext *cct, bufferlist::iterator& bl);
extern Message *decode_message(CephContext *cct, int crcflags,
bufferlist::iterator& bl);

#endif
17 changes: 17 additions & 0 deletions src/msg/Messenger.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#include "include/types.h"
#include "Messenger.h"
Expand All @@ -20,3 +22,18 @@ Messenger *Messenger::create(CephContext *cct, const string &type,
lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
return NULL;
}

/*
* Pre-calculate desired software CRC settings. CRC computation may
* be disabled by default for some transports (e.g., those with strong
* hardware checksum support).
*/
int Messenger::get_default_crc_flags(md_config_t * conf)
{
int r = 0;
if (conf->ms_crc_data)
r |= MSG_CRC_DATA;
if (conf->ms_crc_header)
r |= MSG_CRC_HEADER;
return r;
}
9 changes: 8 additions & 1 deletion src/msg/Messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class Messenger {
* from this value.
*/
CephContext *cct;
int crcflags;

/**
* A Policy describes the rules of a Connection. Is there a limit on how
Expand Down Expand Up @@ -126,7 +127,8 @@ class Messenger {
Messenger(CephContext *cct_, entity_name_t w)
: my_inst(),
default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false),
cct(cct_)
cct(cct_),
crcflags(get_default_crc_flags(cct->_conf))
{
my_inst.name = w;
}
Expand Down Expand Up @@ -216,6 +218,11 @@ class Messenger {
* (0 if the queue is empty)
*/
virtual double get_dispatch_queue_max_age(utime_t now) = 0;
/**
* Get the default crc flags for this messenger.
* but not yet dispatched.
*/
static int get_default_crc_flags(md_config_t *);

/**
* @} // Accessors
Expand Down
4 changes: 2 additions & 2 deletions src/msg/async/AsyncConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ void AsyncConnection::process()

ldout(async_msgr->cct, 20) << __func__ << " got " << front.length() << " + " << middle.length()
<< " + " << data.length() << " byte message" << dendl;
Message *message = decode_message(async_msgr->cct, current_header, footer, front, middle, data);
Message *message = decode_message(async_msgr->cct, async_msgr->crcflags, current_header, footer, front, middle, data);
if (!message) {
ldout(async_msgr->cct, 1) << __func__ << " decode message failed " << dendl;
goto fail;
Expand Down Expand Up @@ -1947,7 +1947,7 @@ int AsyncConnection::_send(Message *m)
<< features << " " << m << " " << *m << dendl;

// encode and copy out of *m
m->encode(features, !async_msgr->cct->_conf->ms_nocrc);
m->encode(features, async_msgr->crcflags);

// prepare everything
ceph_msg_header& header = m->get_header();
Expand Down
45 changes: 30 additions & 15 deletions src/msg/simple/Pipe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1773,7 +1773,7 @@ void Pipe::writer()
<< " " << m << " " << *m << dendl;

// encode and copy out of *m
m->encode(features, !msgr->cct->_conf->ms_nocrc);
m->encode(features, msgr->crcflags);

// prepare everything
ceph_msg_header& header = m->get_header();
Expand Down Expand Up @@ -1876,11 +1876,13 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
ceph_msg_header header;
ceph_msg_footer footer;
__u32 header_crc;

if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
if (tcp_read((char*)&header, sizeof(header)) < 0)
return -1;
header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
if (msgr->crcflags & MSG_CRC_HEADER) {
header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
}
} else {
ceph_msg_header_old oldheader;
if (tcp_read((char*)&oldheader, sizeof(oldheader)) < 0)
Expand All @@ -1889,8 +1891,10 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
memcpy(&header, &oldheader, sizeof(header));
header.src = oldheader.src.name;
header.reserved = oldheader.reserved;
header.crc = oldheader.crc;
header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
if (msgr->crcflags & MSG_CRC_HEADER) {
header.crc = oldheader.crc;
header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
}
}

ldout(msgr->cct,20) << "reader got envelope type=" << header.type
Expand All @@ -1901,7 +1905,8 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
<< dendl;

// verify header crc
if (header_crc != header.crc) {
if (!(msgr->crcflags & MSG_CRC_HEADER)) {
} else if (header_crc != header.crc) {
ldout(msgr->cct,0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
return -1;
}
Expand Down Expand Up @@ -2027,9 +2032,11 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
ceph_msg_footer_old old_footer;
if (tcp_read((char*)&old_footer, sizeof(old_footer)) < 0)
goto out_dethrottle;
footer.front_crc = old_footer.front_crc;
footer.middle_crc = old_footer.middle_crc;
footer.data_crc = old_footer.data_crc;
if (msgr->crcflags & MSG_CRC_HEADER) {
footer.front_crc = old_footer.front_crc;
footer.middle_crc = old_footer.middle_crc;
footer.data_crc = old_footer.data_crc;
}
footer.sig = 0;
footer.flags = old_footer.flags;
}
Expand All @@ -2045,7 +2052,7 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)

ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
<< " byte message" << dendl;
message = decode_message(msgr->cct, header, footer, front, middle, data);
message = decode_message(msgr->cct, msgr->crcflags, header, footer, front, middle, data);
if (!message) {
ret = -EINVAL;
goto out_dethrottle;
Expand Down Expand Up @@ -2241,8 +2248,12 @@ int Pipe::write_message(ceph_msg_header& header, ceph_msg_footer& footer, buffer
oldheader.src.addr = connection_state->get_peer_addr();
oldheader.orig_src = oldheader.src;
oldheader.reserved = header.reserved;
oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
sizeof(oldheader) - sizeof(oldheader.crc));
if (msgr->crcflags & MSG_CRC_HEADER) {
oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
sizeof(oldheader) - sizeof(oldheader.crc));
} else {
oldheader.crc = 0;
}
msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader;
msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader);
msglen += sizeof(oldheader);
Expand Down Expand Up @@ -2305,9 +2316,13 @@ int Pipe::write_message(ceph_msg_header& header, ceph_msg_footer& footer, buffer
msglen += sizeof(footer);
msg.msg_iovlen++;
} else {
old_footer.front_crc = footer.front_crc;
old_footer.middle_crc = footer.middle_crc;
old_footer.data_crc = footer.data_crc;
if (msgr->crcflags & MSG_CRC_HEADER) {
old_footer.front_crc = footer.front_crc;
old_footer.middle_crc = footer.middle_crc;
old_footer.data_crc = footer.data_crc;
} else {
old_footer.front_crc = old_footer.middle_crc = old_footer.data_crc = 0;
}
old_footer.flags = footer.flags;
msgvec[msg.msg_iovlen].iov_base = (char*)&old_footer;
msgvec[msg.msg_iovlen].iov_len = sizeof(old_footer);
Expand Down
2 changes: 1 addition & 1 deletion src/test/encoding/ceph_dencoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class MessageDencoderImpl : public Dencoder {
bufferlist::iterator p = bl.begin();
p.seek(seek);
try {
Message *n = decode_message(g_ceph_context, p);
Message *n = decode_message(g_ceph_context, 0, p);
if (!n)
throw std::runtime_error("failed to decode");
if (n->get_type() != m_object->get_type()) {
Expand Down

0 comments on commit 2ffacbe

Please sign in to comment.