Skip to content

Commit

Permalink
Merge pull request ceph#5259 from branch-predictor/bp-tuned-bufferlists
Browse files Browse the repository at this point in the history
Bufferlist tuning

Reviewed-by: Sage Weil <[email protected]>
Reviewed-by: Haomai Wang <[email protected]>
Reviewed-by: Samuel Just <[email protected]>
  • Loading branch information
liewegas committed Aug 20, 2015
2 parents 181dc28 + 311ef97 commit 779466e
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 49 deletions.
226 changes: 192 additions & 34 deletions src/common/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
#include <ostream>
namespace ceph {

#if defined(__GNUC__) && defined(__x86_64__)
typedef unsigned uint128_t __attribute__ ((mode (TI)));
#endif

#ifdef BUFFER_DEBUG
static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
# define bdout { simple_spin_lock(&buffer_debug_lock); std::cout
Expand Down Expand Up @@ -777,6 +781,38 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
unsigned buffer::ptr::raw_length() const { assert(_raw); return _raw->len; }
int buffer::ptr::raw_nref() const { assert(_raw); return _raw->nref.read(); }

void buffer::ptr::copy_out(unsigned o, unsigned l, char *dest) const {
assert(_raw);
if (o+l > _len)
throw end_of_buffer();
char* src = _raw->data + _off + o;
if (l > 8) {
memcpy(dest, src, l);
return;
}
switch (l) {
case 8:
*((uint64_t*)(dest)) = *((uint64_t*)(src));
return;
case 4:
*((uint32_t*)(dest)) = *((uint32_t*)(src));
return;
case 3:
*((uint16_t*)(dest)) = *((uint16_t*)(src));
*((uint8_t*)(dest+2)) = *((uint8_t*)(src+2));
return;
case 2:
*((uint16_t*)(dest)) = *((uint16_t*)(src));
return;
case 1:
*((uint8_t*)(dest)) = *((uint8_t*)(src));
return;
default:
memcpy(dest, src, l);
return;
}
}

unsigned buffer::ptr::wasted()
{
assert(_raw);
Expand All @@ -800,53 +836,175 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;

bool buffer::ptr::is_zero() const
{
const char *data = c_str();
for (size_t p = 0; p < _len; p++) {
if (data[p] != 0) {
return false;
}
const char* data = c_str();
const char* max = data + _len;
const char* max32 = data + (_len / sizeof(uint32_t))*sizeof(uint32_t);
#if defined(__GNUC__) && defined(__x86_64__)
// we do have XMM registers in x86-64, so if we need to check at least
// 16 bytes, make use of them
int left = _len;
if (left / sizeof(uint128_t) > 0) {
// align data pointer to 16 bytes, otherwise it'll segfault due to bug
// in (at least some) GCC versions (using MOVAPS instead of MOVUPS).
// check up to 15 first bytes while at it.
while (((unsigned long long)data) & 15) {
if (*(uint8_t*)data != 0) {
return false;
}
data += sizeof(uint8_t);
left--;
}

const char* max128 = data + (left / sizeof(uint128_t))*sizeof(uint128_t);

while (data < max128) {
if (*(uint128_t*)data != 0) {
return false;
}
data += sizeof(uint128_t);
}
}
#endif
while (data < max32) {
if (*(uint32_t*)data != 0) {
return false;
}
data += sizeof(uint32_t);
}

while (data < max) {
if (*(uint8_t*)data != 0) {
return false;
}
data += sizeof(uint8_t);
}

return true;
}

void buffer::ptr::append(char c)
unsigned buffer::ptr::append(char c)
{
assert(_raw);
assert(1 <= unused_tail_length());
(c_str())[_len] = c;
char* ptr = _raw->data + _off + _len;
*ptr = c;
_len++;
return _len + _off;
}
void buffer::ptr::append(const char *p, unsigned l)

unsigned buffer::ptr::append(const char *p, unsigned l)
{
assert(_raw);
assert(l <= unused_tail_length());
memcpy(c_str() + _len, p, l);
_len += l;
char* c = _raw->data + _off + _len;
if (l <= 32) {
_len += l;
switch (l) {
case 16:
*((uint64_t*)(c)) = *((uint64_t*)(p));
*((uint64_t*)(c+sizeof(uint64_t))) = *((uint64_t*)(p+sizeof(uint64_t)));
return _len + _off;
case 8:
*((uint64_t*)(c)) = *((uint64_t*)(p));
return _len + _off;
case 4:
*((uint32_t*)(c)) = *((uint32_t*)(p));
return _len + _off;
case 2:
*((uint16_t*)(c)) = *((uint16_t*)(p));
return _len + _off;
case 1:
*((uint8_t*)(c)) = *((uint8_t*)(p));
return _len + _off;
}
int cursor = 0;
while (l >= sizeof(uint64_t)) {
*((uint64_t*)(c + cursor)) = *((uint64_t*)(p + cursor));
cursor += sizeof(uint64_t);
l -= sizeof(uint64_t);
}
while (l >= sizeof(uint32_t)) {
*((uint32_t*)(c + cursor)) = *((uint32_t*)(p + cursor));
cursor += sizeof(uint32_t);
l -= sizeof(uint32_t);
}
while (l > 0) {
*(c+cursor) = *(p+cursor);
cursor++;
l--;
}
}
else {
memcpy(c, p, l);
_len += l;
}
return _len + _off;
}

void buffer::ptr::copy_in(unsigned o, unsigned l, const char *src)
void buffer::ptr::copy_in(unsigned o, unsigned l, const char *src, bool crc_reset)
{
assert(_raw);
assert(o <= _len);
assert(o+l <= _len);
_raw->invalidate_crc();
memcpy(c_str()+o, src, l);
char* dest = _raw->data + _off + o;
if (crc_reset)
_raw->invalidate_crc();
if (l < 64) {
switch (l) {
case 1:
*((uint8_t*)(dest)) = *((uint8_t*)(src));
return;
case 2:
*((uint16_t*)(dest)) = *((uint16_t*)(src));
return;
case 3:
*((uint16_t*)(dest)) = *((uint16_t*)(src));
*((uint8_t*)(dest+2)) = *((uint8_t*)(src+2));
return;
case 4:
*((uint32_t*)(dest)) = *((uint32_t*)(src));
return;
case 8:
*((uint64_t*)(dest)) = *((uint64_t*)(src));
return;
default:
int cursor = 0;
while (l >= sizeof(uint64_t)) {
*((uint64_t*)(dest + cursor)) = *((uint64_t*)(src + cursor));
cursor += sizeof(uint64_t);
l -= sizeof(uint64_t);
}
while (l >= sizeof(uint32_t)) {
*((uint32_t*)(dest + cursor)) = *((uint32_t*)(src + cursor));
cursor += sizeof(uint32_t);
l -= sizeof(uint32_t);
}
while (l > 0) {
*(dest + cursor) = *(src + cursor);
cursor++;
l--;
}
return;
}
} else {
memcpy(dest, src, l);
}
}

void buffer::ptr::zero()
void buffer::ptr::zero(bool crc_reset)
{
_raw->invalidate_crc();
if (crc_reset)
_raw->invalidate_crc();
memset(c_str(), 0, _len);
}

void buffer::ptr::zero(unsigned o, unsigned l)
void buffer::ptr::zero(unsigned o, unsigned l, bool crc_reset)
{
assert(o+l <= _len);
_raw->invalidate_crc();
if (crc_reset)
_raw->invalidate_crc();
memset(c_str()+o, 0, l);
}

bool buffer::ptr::can_zero_copy() const
{
return _raw->can_zero_copy();
Expand Down Expand Up @@ -1022,7 +1180,7 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;

// copy data in

void buffer::list::iterator::copy_in(unsigned len, const char *src)
void buffer::list::iterator::copy_in(unsigned len, const char *src, bool crc_reset)
{
// copy
if (p == ls->end())
Expand All @@ -1034,7 +1192,7 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
unsigned howmuch = p->length() - p_off;
if (len < howmuch)
howmuch = len;
p->copy_in(p_off, howmuch, src);
p->copy_in(p_off, howmuch, src, crc_reset);

src += howmuch;
len -= howmuch;
Expand Down Expand Up @@ -1231,12 +1389,13 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
for (std::list<ptr>::iterator it = _buffers.begin();
it != _buffers.end();
++it) {
nb.copy_in(pos, it->length(), it->c_str());
nb.copy_in(pos, it->length(), it->c_str(), false);
pos += it->length();
}
_memcopy_count += pos;
_buffers.clear();
_buffers.push_back(nb);
invalidate_crc();
}

void buffer::list::rebuild_aligned(unsigned align)
Expand Down Expand Up @@ -1346,14 +1505,14 @@ void buffer::list::rebuild_page_aligned()
return last_p.copy(len, dest);
}

void buffer::list::copy_in(unsigned off, unsigned len, const char *src)
void buffer::list::copy_in(unsigned off, unsigned len, const char *src, bool crc_reset)
{
if (off + len > length())
throw end_of_buffer();

if (last_p.get_off() != off)
last_p.seek(off);
last_p.copy_in(len, src);
last_p.copy_in(len, src, crc_reset);
}

void buffer::list::copy_in(unsigned off, unsigned len, const list& src)
Expand All @@ -1372,25 +1531,24 @@ void buffer::list::rebuild_page_aligned()
append_buffer = create_aligned(CEPH_BUFFER_APPEND_SIZE, CEPH_BUFFER_APPEND_SIZE);
append_buffer.set_length(0); // unused, so far.
}
append_buffer.append(c);
append(append_buffer, append_buffer.end() - 1, 1); // add segment to the list
append(append_buffer, append_buffer.append(c) - 1, 1); // add segment to the list
}

void buffer::list::append(const char *data, unsigned len)
{
while (len > 0) {
// put what we can into the existing append_buffer.
unsigned gap = append_buffer.unused_tail_length();
if (gap > 0) {
if (gap > len) gap = len;
//cout << "append first char is " << data[0] << ", last char is " << data[len-1] << std::endl;
append_buffer.append(data, gap);
append(append_buffer, append_buffer.end() - gap, gap); // add segment to the list
len -= gap;
data += gap;
if (gap > len) gap = len;
//cout << "append first char is " << data[0] << ", last char is " << data[len-1] << std::endl;
append_buffer.append(data, gap);
append(append_buffer, append_buffer.end() - gap, gap); // add segment to the list
len -= gap;
data += gap;
}
if (len == 0)
break; // done!
break; // done!

// make a new append_buffer!
unsigned alen = CEPH_BUFFER_APPEND_SIZE * (((len-1) / CEPH_BUFFER_APPEND_SIZE) + 1);
Expand Down
21 changes: 8 additions & 13 deletions src/include/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,7 @@ class CEPH_BUFFER_API buffer {
unsigned raw_length() const;
int raw_nref() const;

void copy_out(unsigned o, unsigned l, char *dest) const {
assert(_raw);
if (!((o <= _len) && (o+l <= _len)))
throw end_of_buffer();
memcpy(dest, c_str()+o, l);
}
void copy_out(unsigned o, unsigned l, char *dest) const;

bool can_zero_copy() const;
int zero_copy_to_fd(int fd, int64_t *offset) const;
Expand All @@ -246,11 +241,11 @@ class CEPH_BUFFER_API buffer {
_len = l;
}

void append(char c);
void append(const char *p, unsigned l);
void copy_in(unsigned o, unsigned l, const char *src);
void zero();
void zero(unsigned o, unsigned l);
unsigned append(char c);
unsigned append(const char *p, unsigned l);
void copy_in(unsigned o, unsigned l, const char *src, bool crc_reset = true);
void zero(bool crc_reset = true);
void zero(unsigned o, unsigned l, bool crc_reset = true);

};

Expand Down Expand Up @@ -314,7 +309,7 @@ class CEPH_BUFFER_API buffer {
void copy_all(list &dest);

// copy data in
void copy_in(unsigned len, const char *src);
void copy_in(unsigned len, const char *src, bool crc_reset = true);
void copy_in(unsigned len, const list& otherl);

};
Expand Down Expand Up @@ -449,7 +444,7 @@ class CEPH_BUFFER_API buffer {
void copy(unsigned off, unsigned len, char *dest) const;
void copy(unsigned off, unsigned len, list &dest) const;
void copy(unsigned off, unsigned len, std::string& dest) const;
void copy_in(unsigned off, unsigned len, const char *src);
void copy_in(unsigned off, unsigned len, const char *src, bool crc_reset = true);
void copy_in(unsigned off, unsigned len, const list& src);

void append(char c);
Expand Down
6 changes: 4 additions & 2 deletions src/include/encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ inline void encode(const std::string& s, bufferlist& bl, uint64_t features=0)
{
__u32 len = s.length();
encode(len, bl);
bl.append(s.data(), len);
if (len)
bl.append(s.data(), len);
}
inline void decode(std::string& s, bufferlist::iterator& p)
{
Expand All @@ -189,7 +190,8 @@ inline void encode(const char *s, bufferlist& bl)
{
__u32 len = strlen(s);
encode(len, bl);
bl.append(s, len);
if (len)
bl.append(s, len);
}


Expand Down

0 comments on commit 779466e

Please sign in to comment.