diff --git a/src/include/denc.h b/src/include/denc.h index d04f959d043dc..4febc74a75f86 100644 --- a/src/include/denc.h +++ b/src/include/denc.h @@ -44,11 +44,12 @@ #include "buffer.h" #include "byteorder.h" -template +template struct denc_traits { static constexpr bool supported = false; static constexpr bool featured = false; static constexpr bool bounded = false; + static constexpr bool need_contiguous = true; }; @@ -138,6 +139,7 @@ struct denc_traits { static constexpr bool supported = true; static constexpr bool bounded = false; static constexpr bool featured = false; + static constexpr bool need_contiguous = true; static void bound_encode(const T &o, size_t& p, uint64_t f=0); static void encode(const T &o, buffer::list::contiguous_appender& p, uint64_t f=0); @@ -151,6 +153,7 @@ struct denc_traits { static constexpr bool supported = true; static constexpr bool bounded = false; static constexpr bool featured = true; + static constexpr bool need_contiguous = true; static void bound_encode(const T &o, size_t& p, uint64_t f); static void encode(const T &o, buffer::list::contiguous_appender& p, uint64_t f); @@ -190,6 +193,12 @@ struct denc_traits { declared via WRITE_CLASS_DENC, although you can also invoke them explicitly in your code. + - These methods are optimised for contiguous buffer, but denc() will try + rebuild a contigous one if the decoded bufferlist is segmented. If you are + concerned about the cost, you might want to define yet another method: + + void decode(buffer::list::iterator &p); + - These can be defined either explicitly (as above), or can be "magically" defined all in one go using the DENC macro and DENC_{START,FINISH} helpers (which work like the legacy {ENCODE,DECODE}_{START,FINISH} macros): @@ -212,35 +221,47 @@ struct denc_traits { // --------------------------------------------------------------------- // raw types +namespace _denc { + template struct is_any_of : std::false_type + {}; + template + struct is_any_of : std::conditional< + std::is_same::value, + std::true_type, + is_any_of>::type + {}; +} -#define WRITE_RAW_DENC(type) \ - template<> \ - struct denc_traits { \ - static constexpr bool supported = true; \ - static constexpr bool featured = false; \ - static constexpr bool bounded = true; \ - static void bound_encode(const type &o, size_t& p, uint64_t f=0) { \ - p += sizeof(type); \ - } \ - static void encode(const type &o, \ - buffer::list::contiguous_appender& p, \ - uint64_t f=0) { \ - p.append((const char*)&o, sizeof(o)); \ - } \ - static void decode(type& o, buffer::ptr::iterator &p, \ - uint64_t f=0) { \ - o = *(type *)p.get_pos_add(sizeof(o)); \ - } \ - }; -WRITE_RAW_DENC(ceph_le64) -WRITE_RAW_DENC(ceph_le32) -WRITE_RAW_DENC(ceph_le16) -WRITE_RAW_DENC(uint8_t); +template +struct denc_traits< + T, + typename std::enable_if< + _denc::is_any_of::value>::type> { + static constexpr bool supported = true; + static constexpr bool featured = false; + static constexpr bool bounded = true; + static constexpr bool need_contiguous = false; + static void bound_encode(const T &o, size_t& p, uint64_t f=0) { + p += sizeof(T); + } + static void encode(const T &o, + buffer::list::contiguous_appender& p, + uint64_t f=0) { + p.append((const char*)&o, sizeof(o)); + } + static void decode(T& o, buffer::ptr::iterator &p, + uint64_t f=0) { + o = *(T *)p.get_pos_add(sizeof(o)); + } + static void decode(T& o, buffer::list::iterator &p) { + p.copy(sizeof(T), reinterpret_cast(&o)); + } +}; // ----------------------------------------------------------------------- @@ -254,34 +275,65 @@ WRITE_RAW_DENC(int8_t); // template, and hence get selected. This machinary prevents these these from // getting glued into the legacy encode/decode methods; the overhead of setting // up a contiguous_appender etc is likely to be slower. +namespace _denc { -#define WRITE_INT_DENC(itype, etype) \ - template<> \ - struct denc_traits { \ - static constexpr bool supported = true; \ - static constexpr bool featured = false; \ - static constexpr bool bounded = true; \ - static void bound_encode(const itype &o, size_t& p, uint64_t f=0) { \ - p += sizeof(etype); \ - } \ - static void encode(const itype &o, buffer::list::contiguous_appender& p, \ - uint64_t f=0) { \ - *(etype *)p.get_pos_add(sizeof(etype)) = o; \ - } \ - static void decode(itype& o, buffer::ptr::iterator &p, \ - uint64_t f=0) { \ - o = *(etype*)p.get_pos_add(sizeof(etype)); \ - } \ - }; +template struct ExtType { + using type = void; +}; -WRITE_INT_DENC(uint16_t, __le16); -WRITE_INT_DENC(int16_t, __le16); -WRITE_INT_DENC(uint32_t, __le32); -WRITE_INT_DENC(int32_t, __le32); -WRITE_INT_DENC(uint64_t, __le64); -WRITE_INT_DENC(int64_t, __le64); -WRITE_INT_DENC(bool, uint8_t); +template struct ExtType< + T, + typename std::enable_if::value || + std::is_same::value>::type> { + using type = __le16; +}; + +template struct ExtType< + T, + typename std::enable_if::value || + std::is_same::value>::type> { + using type = __le32; +}; + +template struct ExtType< + T, + typename std::enable_if::value || + std::is_same::value>::type> { + using type = __le64; +}; +template<> struct ExtType { + using type = uint8_t; +}; +} // namespace _denc + +template +struct denc_traits< + T, + typename std::enable_if::type>::value>::type> +{ + static constexpr bool supported = true; + static constexpr bool featured = false; + static constexpr bool bounded = true; + static constexpr bool need_contiguous = false; + using etype = typename _denc::ExtType::type; + static void bound_encode(const T &o, size_t& p, uint64_t f=0) { + p += sizeof(etype); + } + static void encode(const T &o, buffer::list::contiguous_appender& p, + uint64_t f=0) { + *(etype *)p.get_pos_add(sizeof(etype)) = o; + } + static void decode(T& o, buffer::ptr::iterator &p, + uint64_t f=0) { + o = *(etype*)p.get_pos_add(sizeof(etype)); + } + static void decode(T& o, buffer::list::iterator &p) { + etype e; + p.copy(sizeof(etype), reinterpret_cast(&e)); + o = e; + } +}; // varint // @@ -561,6 +613,40 @@ inline typename std::enable_if + struct has_legacy_denc : std::false_type + {}; + template + struct has_legacy_denc() + .decode(std::declval()))> : std::true_type + { + static void decode(T& v, bufferlist::iterator& p) { + v.decode(p); + } + }; + template + struct has_legacy_denc::need_contiguous>::type> : std::true_type + { + static void decode(T& v, bufferlist::iterator& p) { + denc_traits::decode(v, p); + } + }; +} + +template, + typename has_legacy_denc=_denc::has_legacy_denc> +inline typename std::enable_if::type denc( + T& o, + buffer::list::iterator& p) +{ + has_legacy_denc::decode(o, p); +} // --------------------------------------------------------------------- // base types and containers @@ -577,6 +663,7 @@ struct denc_traits,A>> { static constexpr bool supported = true; static constexpr bool featured = false; static constexpr bool bounded = false; + static constexpr bool need_contiguous = false; static void bound_encode(const value_type& s, size_t& p, uint64_t f=0) { p += sizeof(uint32_t) + s.size(); @@ -594,6 +681,13 @@ struct denc_traits,A>> { ::denc(len, p); decode_nohead(len, s, p); } + static void decode(value_type& s, buffer::list::iterator& p) + { + uint32_t len; + ::denc(len, p); + s.clear(); + p.copy(len, s); + } static void decode_nohead(size_t len, value_type& s, buffer::ptr::iterator& p) { s.clear(); @@ -615,6 +709,7 @@ struct denc_traits { static constexpr bool supported = true; static constexpr bool featured = false; static constexpr bool bounded = false; + static constexpr bool need_contiguous = false; static void bound_encode(const bufferptr& v, size_t& p, uint64_t f=0) { p += sizeof(uint32_t) + v.length(); } @@ -628,6 +723,18 @@ struct denc_traits { ::denc(len, p); v = p.get_ptr(len); } + static void decode(bufferptr& v, buffer::list::iterator& p) { + uint32_t len; + ::denc(len, p); + bufferlist s; + p.copy(len, s); + if (len) { + if (s.get_num_buffers() == 1) + v = s.front(); + else + v = buffer::copy(s.c_str(), s.length()); + } + } }; // @@ -638,6 +745,7 @@ struct denc_traits { static constexpr bool supported = true; static constexpr bool featured = false; static constexpr bool bounded = false; + static constexpr bool need_contiguous = false; static void bound_encode(const bufferlist& v, size_t& p, uint64_t f=0) { p += sizeof(uint32_t) + v.length(); } @@ -652,6 +760,12 @@ struct denc_traits { v.clear(); v.push_back(p.get_ptr(len)); } + static void decode(bufferlist& v, buffer::list::iterator& p) { + uint32_t len; + ::denc(len, p); + v.clear(); + p.copy(len, v); + } static void encode_nohead(const bufferlist& v, buffer::list::contiguous_appender& p) { p.append(v); @@ -679,6 +793,8 @@ struct denc_traits< static constexpr bool supported = true; static constexpr bool featured = a_traits::featured || b_traits::featured ; static constexpr bool bounded = a_traits::bounded && b_traits::bounded; + static constexpr bool need_contiguous = (a_traits::need_contiguous || + b_traits::need_contiguous); template static typename std::enable_if + static typename std::enable_if::type + decode(std::pair& v, buffer::list::iterator& p, + uint64_t f = 0) { + denc(v.first, p); + denc(v.second, p); + } }; namespace _denc { @@ -730,6 +853,7 @@ namespace _denc { static constexpr bool supported = true; static constexpr bool featured = traits::featured; static constexpr bool bounded = false; + static constexpr bool need_contiguous = traits::need_contiguous; template static typename std::enable_if + static typename std::enable_if::type + decode(container& s, buffer::list::iterator& p) { + uint32_t num; + denc(num, p); + decode_nohead(num, s, p); + } // nohead template @@ -829,6 +960,18 @@ namespace _denc { Details::insert(s, std::move(t)); } } + template + static typename std::enable_if::type + decode_nohead(size_t num, container& s, + buffer::list::iterator& p) { + s.clear(); + Details::reserve(s, num); + while (num--) { + T t; + denc(t, p); + Details::insert(s, std::move(t)); + } + } }; template @@ -965,6 +1108,7 @@ struct denc_traits< static constexpr bool supported = true; static constexpr bool featured = traits::featured; static constexpr bool bounded = traits::bounded; + static constexpr bool need_contiguous = traits::need_contiguous; template static typename std::enable_if + static typename std::enable_if::type + decode(container& s, buffer::list::iterator& p) { + for (auto& e : s) { + denc(e, p); + } + } }; namespace _denc { @@ -1066,12 +1218,16 @@ namespace _denc { tuple_traits::bounded); static constexpr bool featured = (denc_traits::featured || tuple_traits::featured); + static constexpr bool need_contiguous = + (denc_traits::need_contiguous || + tuple_traits::need_contiguous); }; template<> struct tuple_traits<> { static constexpr bool supported = true; static constexpr bool bounded = true; static constexpr bool featured = false; + static constexpr bool need_contiguous = false; }; } @@ -1167,6 +1323,17 @@ struct denc_traits< _denc::indices) { denc(std::get(s), p); } + template + static void decode_helper(T& s, buffer::list::iterator& p, + _denc::indices) { + denc(std::get(s), p); + decode_helper(s, p, _denc::indices{}); + } + template + static void decode_helper(T& s, buffer::list::iterator& p, + _denc::indices) { + denc(std::get(s), p); + } public: using traits = _denc::tuple_traits; @@ -1174,6 +1341,7 @@ struct denc_traits< static constexpr bool supported = true; static constexpr bool featured = traits::featured; static constexpr bool bounded = traits::bounded; + static constexpr bool need_contiguous = traits::need_contiguous; template @@ -1222,6 +1390,12 @@ struct denc_traits< static void decode(container& s, buffer::ptr::iterator& p, uint64_t f = 0) { decode_helper(s, p, _denc::build_indices_t{}); } + template + static typename std::enable_if::type + decode(container& s, buffer::list::iterator& p, uint64_t f = 0) { + decode_helper(s, p, _denc::build_indices_t{}); + } }; // @@ -1236,6 +1410,7 @@ struct denc_traits< static constexpr bool supported = true; static constexpr bool featured = traits::featured; static constexpr bool bounded = false; + static constexpr bool need_contiguous = traits::need_contiguous; template static typename std::enable_if::type @@ -1280,6 +1455,19 @@ struct denc_traits< } } + template + static typename std::enable_if::type + decode(boost::optional& v, buffer::list::iterator& p) { + bool x; + denc(x, p); + if (x) { + v = T{}; + denc(*v, p); + } else { + v = boost::none; + } + } + template static typename std::enable_if::type encode_nohead(const boost::optional& v, @@ -1312,6 +1500,7 @@ struct denc_traits { static constexpr bool supported = true; static constexpr bool featured = false; static constexpr bool bounded = true; + static constexpr bool need_contiguous = false; static void bound_encode(const boost::none_t& v, size_t& p) { p += sizeof(bool); @@ -1336,6 +1525,7 @@ struct denc_traits { static constexpr bool supported = true; \ static constexpr bool featured = false; \ static constexpr bool bounded = b; \ + static constexpr bool need_contiguous = !_denc::has_legacy_denc::value;\ static void bound_encode(const T& v, size_t& p, uint64_t f=0) { \ v.bound_encode(p); \ } \ @@ -1355,6 +1545,7 @@ struct denc_traits { static constexpr bool supported = true; \ static constexpr bool featured = true; \ static constexpr bool bounded = b; \ + static constexpr bool need_contiguous = !_denc::has_legacy_denc::value;\ static void bound_encode(const T& v, size_t& p, uint64_t f) { \ v.bound_encode(p, f); \ } \ @@ -1399,34 +1590,48 @@ inline typename std::enable_if> +template> inline typename std::enable_if::type decode( + !traits::need_contiguous>::type decode( T& o, bufferlist::iterator& p) { if (p.end()) throw buffer::end_of_buffer(); - // ensure we get a contigous buffer... until the end of the - // bufferlist. we don't really know how much we'll need here, - // unfortunately. hopefully it is already contiguous and we're just - // bumping the raw ref and initializing the ptr tmp fields. - bufferptr tmp; - bufferlist::iterator t = p; - t.copy_shallow(p.get_bl().length() - p.get_off(), tmp); - auto cp = tmp.begin(); - traits::decode(o, cp); - p.advance((ssize_t)cp.get_offset()); + const auto& bl = p.get_bl(); + const auto remaining = bl.length() - p.get_off(); + // it is expensive to rebuild a contigous buffer and drop it, so avoid this. + if (p.get_current_ptr().get_raw() != bl.back().get_raw() && + remaining > CEPH_PAGE_SIZE) { + traits::decode(o, p); + } else { + // ensure we get a contigous buffer... until the end of the + // bufferlist. we don't really know how much we'll need here, + // unfortunately. hopefully it is already contiguous and we're just + // bumping the raw ref and initializing the ptr tmp fields. + bufferptr tmp; + bufferlist::iterator t = p; + t.copy_shallow(remaining, tmp); + auto cp = tmp.begin(); + traits::decode(o, cp); + p.advance((ssize_t)cp.get_offset()); + } } -template> +template> inline typename std::enable_if::type decode( + traits::need_contiguous>::type decode( T& o, bufferlist::iterator& p) { if (p.end()) throw buffer::end_of_buffer(); + // ensure we get a contigous buffer... until the end of the + // bufferlist. we don't really know how much we'll need here, + // unfortunately. hopefully it is already contiguous and we're just + // bumping the raw ref and initializing the ptr tmp fields. bufferptr tmp; bufferlist::iterator t = p; t.copy_shallow(p.get_bl().length() - p.get_off(), tmp); diff --git a/src/include/fs_types.h b/src/include/fs_types.h index 5513fcefd7922..dba085ad13e03 100644 --- a/src/include/fs_types.h +++ b/src/include/fs_types.h @@ -32,6 +32,7 @@ struct denc_traits { static constexpr bool supported = true; static constexpr bool featured = false; static constexpr bool bounded = true; + static constexpr bool need_contiguous = true; static void bound_encode(const inodeno_t &o, size_t& p) { denc(o.val, p); } diff --git a/src/include/interval_set.h b/src/include/interval_set.h index de7cf930894ba..b84d8187052d6 100644 --- a/src/include/interval_set.h +++ b/src/include/interval_set.h @@ -249,15 +249,21 @@ class interval_set { denc_traits>::bound_encode(m, p); } void encode(bufferlist::contiguous_appender& p) const { - denc_traits>::encode(m, p); + denc(m, p); } void decode(bufferptr::iterator& p) { - denc_traits>::decode(m, p); + denc(m, p); _size = 0; - for (typename std::map::const_iterator i = m.begin(); - i != m.end(); - i++) - _size += i->second; + for (const auto& i : m) { + _size += i.second; + } + } + void decode(bufferlist::iterator& p) { + denc(m, p); + _size = 0; + for (const auto& i : m) { + _size += i.second; + } } void encode_nohead(bufferlist::contiguous_appender& p) const { @@ -266,10 +272,9 @@ class interval_set { void decode_nohead(int n, bufferptr::iterator& p) { denc_traits>::decode_nohead(n, m, p); _size = 0; - for (typename std::map::const_iterator i = m.begin(); - i != m.end(); - i++) - _size += i->second; + for (const auto& i : m) { + _size += i.second; + } } void clear() { @@ -576,6 +581,7 @@ struct denc_traits> { static constexpr bool supported = true; static constexpr bool bounded = false; static constexpr bool featured = false; + static constexpr bool need_contiguous = denc_traits::need_contiguous; static void bound_encode(const interval_set& v, size_t& p) { v.bound_encode(p); } @@ -586,6 +592,11 @@ struct denc_traits> { static void decode(interval_set& v, bufferptr::iterator& p) { v.decode(p); } + template + static typename std::enable_if::type + decode(interval_set& v, bufferlist::iterator& p) { + v.decode(p); + } static void encode_nohead(const interval_set& v, bufferlist::contiguous_appender& p) { v.encode_nohead(p); diff --git a/src/include/object.h b/src/include/object.h index 672fbc4c3f91c..1bad9ea24bcfb 100644 --- a/src/include/object.h +++ b/src/include/object.h @@ -127,6 +127,7 @@ struct denc_traits { static constexpr bool supported = true; static constexpr bool featured = false; static constexpr bool bounded = true; + static constexpr bool need_contiguous = true; static void bound_encode(const snapid_t& o, size_t& p) { denc(o.val, p); } diff --git a/src/os/bluestore/bluestore_types.h b/src/os/bluestore/bluestore_types.h index f60b53baf1dd7..6841ead46b06c 100644 --- a/src/os/bluestore/bluestore_types.h +++ b/src/os/bluestore/bluestore_types.h @@ -168,6 +168,7 @@ struct denc_traits { static constexpr bool supported = true; static constexpr bool bounded = false; static constexpr bool featured = false; + static constexpr bool need_contiguous = true; static void bound_encode(const PExtentVector& v, size_t& p) { p += sizeof(uint32_t); const auto size = v.size(); diff --git a/src/test/test_denc.cc b/src/test/test_denc.cc index 0ea97a1169917..8f702735bc809 100644 --- a/src/test/test_denc.cc +++ b/src/test/test_denc.cc @@ -15,6 +15,7 @@ */ #include +#include #include "global/global_init.h" #include "common/ceph_argparse.h" @@ -578,3 +579,120 @@ TEST(denc, optional) ASSERT_EQ(bpi.get_pos(), bl.c_str() + bl.length()); } } + +// unlike legacy_t, Legacy supports denc() also. +struct Legacy { + static unsigned n_denc; + static unsigned n_decode; + uint8_t value = 0; + DENC(Legacy, v, p) { + n_denc++; + denc(v.value, p); + } + void decode(buffer::list::iterator& p) { + n_decode++; + ::decode(value, p); + } + static void reset() { + n_denc = n_decode = 0; + } + static bufferlist encode_n(unsigned n, const vector& segments); +}; +WRITE_CLASS_DENC(Legacy) +unsigned Legacy::n_denc = 0; +unsigned Legacy::n_decode = 0; + +bufferlist Legacy::encode_n(unsigned n, const vector& segments) { + vector v; + for (unsigned i = 0; i < n; i++) { + v.push_back(Legacy()); + } + bufferlist bl(n * sizeof(uint8_t)); + ::encode(v, bl); + bufferlist segmented; + auto p = bl.begin(); + + auto sum = std::accumulate(segments.begin(), segments.end(), 0u); + for (auto i : segments) { + buffer::ptr seg; + p.copy_deep(bl.length() * i / sum, seg); + segmented.push_back(seg); + } + p.copy_all(segmented); + return segmented; +} + +TEST(denc, no_copy_if_segmented_and_lengthy) +{ + static_assert(_denc::has_legacy_denc::value, + "Legacy do have legacy denc"); + { + // use denc() which shallow_copy() if the buffer is small + constexpr unsigned N_COPIES = 42; + const vector segs{50, 50}; // half-half + bufferlist segmented = Legacy::encode_n(N_COPIES, segs); + ASSERT_GT(segmented.get_num_buffers(), 1u); + ASSERT_LT(segmented.length(), CEPH_PAGE_SIZE); + auto p = segmented.begin(); + vector v; + // denc() is shared by encode() and decode(), so reset() right before + // decode() + Legacy::reset(); + ::decode(v, p); + ASSERT_EQ(N_COPIES, v.size()); + ASSERT_EQ(N_COPIES, Legacy::n_denc); + ASSERT_EQ(0u, Legacy::n_decode); + } + { + // use denc() which shallow_copy() if the buffer is not segmented and large + const unsigned N_COPIES = CEPH_PAGE_SIZE * 2; + const vector segs{100}; + bufferlist segmented = Legacy::encode_n(N_COPIES, segs); + ASSERT_EQ(segmented.get_num_buffers(), 1u); + ASSERT_GT(segmented.length(), CEPH_PAGE_SIZE); + auto p = segmented.begin(); + vector v; + Legacy::reset(); + ::decode(v, p); + ASSERT_EQ(N_COPIES, v.size()); + ASSERT_EQ(N_COPIES, Legacy::n_denc); + ASSERT_EQ(0u, Legacy::n_decode); + } + { + // use denc() which shallow_copy() if the buffer is segmented and large, + // but the total size of the chunks to be decoded is smallish. + bufferlist large_bl = Legacy::encode_n(CEPH_PAGE_SIZE * 2, {50, 50}); + bufferlist small_bl = Legacy::encode_n(100, {50, 50}); + bufferlist segmented; + segmented.append(large_bl); + segmented.append(small_bl); + ASSERT_GT(segmented.get_num_buffers(), 1); + ASSERT_GT(segmented.length(), CEPH_PAGE_SIZE); + auto p = segmented.begin(); + p.advance(large_bl.length()); + ASSERT_LT(segmented.length() - p.get_off(), CEPH_PAGE_SIZE); + vector v; + Legacy::reset(); + ::decode(v, p); + ASSERT_EQ(Legacy::n_denc, 100u); + ASSERT_EQ(0u, Legacy::n_decode); + } + { + // use decode() which avoids deep copy if the buffer is segmented and large + bufferlist small_bl = Legacy::encode_n(100, {50, 50}); + bufferlist large_bl = Legacy::encode_n(CEPH_PAGE_SIZE * 2, {50, 50}); + bufferlist segmented; + segmented.append(small_bl); + segmented.append(large_bl); + ASSERT_GT(segmented.get_num_buffers(), 1); + ASSERT_GT(segmented.length(), CEPH_PAGE_SIZE); + auto p = segmented.begin(); + p.advance(small_bl.length()); + ASSERT_GT(segmented.length() - p.get_off(), CEPH_PAGE_SIZE); + vector v; + Legacy::reset(); + ::decode(v, p); + ASSERT_EQ(0u, Legacy::n_denc); + ASSERT_EQ(CEPH_PAGE_SIZE * 2, Legacy::n_decode); + } +}