diff --git a/sto-core/CMakeLists.txt b/sto-core/CMakeLists.txt index c4a713b8..b9ba56e6 100644 --- a/sto-core/CMakeLists.txt +++ b/sto-core/CMakeLists.txt @@ -9,6 +9,7 @@ add_library(sto TRcu.cc ContentionManager.cc MVCC.hh + MVCCStructs.cc VersionBase.hh OCCVersions.hh EagerVersions.hh diff --git a/sto-core/MVCCStructs.cc b/sto-core/MVCCStructs.cc new file mode 100644 index 00000000..230d6f83 --- /dev/null +++ b/sto-core/MVCCStructs.cc @@ -0,0 +1,25 @@ +#include "MVCCStructs.hh" + +std::ostream& operator<<(std::ostream& w, MvStatus s) { + switch (s) { + case UNUSED: return w << "UNUSED"; + case ABORTED: return w << "ABORTED"; + case PENDING_DELETED: return w << "PENDING_DELETED"; + case COMMITTED_DELETED: return w << "COMMITTED_DELETED"; + case PENDING_DELTA: return w << "PENDING_DELTA"; + case COMMITTED_DELTA: return w << "COMMITTED_DELTA"; + case LOCKED_COMMITTED_DELTA: return w << "LOCKED_COMMITTED_DELTA"; + case PENDING: return w << "PENDING"; + case COMMITTED: return w << "COMMITTED"; + default: return w << "MvStatus[" << (int) s << "]"; + } +} + +void MvHistoryBase::print_prevs(size_t max) const { + auto h = this; + for (size_t i = 0; + h && i < max; + ++i, h = h->prev_.load(std::memory_order_relaxed)) { + std::cerr << i << ". " << (void*) h << " " << h->status_.load(std::memory_order_relaxed) << " W" << h->wtid_ << " R" << h->rtid_.load(std::memory_order_relaxed) << "\n"; + } +} diff --git a/sto-core/MVCCStructs.hh b/sto-core/MVCCStructs.hh index 20bf1f25..fe66e24a 100644 --- a/sto-core/MVCCStructs.hh +++ b/sto-core/MVCCStructs.hh @@ -7,6 +7,7 @@ #include #include "MVCCTypes.hh" +#include "Transaction.hh" #include "TRcu.hh" // Status types of MvHistory elements @@ -26,8 +27,29 @@ enum MvStatus { GARBAGE = 0b1000000 }; +std::ostream& operator<<(std::ostream& w, MvStatus s); + +class MvHistoryBase { +public: + using tid_type = TransactionTid::type; + + MvHistoryBase() = delete; + MvHistoryBase(void* obj, tid_type tid, MvStatus status) + : status_(status), wtid_(tid), rtid_(tid), prev_(nullptr), + obj_(obj) { + } + + void print_prevs(size_t max = 1000) const; + + std::atomic status_; // Status of this element + tid_type wtid_; // Write TID + std::atomic rtid_; // Read TID + std::atomic prev_; + void* obj_; // Parent object +}; + template -class MvHistory { +class MvHistory : protected MvHistoryBase { public: typedef TransactionTid::type tid_type; typedef TRcuSet::epoch_type epoch_type; @@ -36,26 +58,28 @@ public: typedef commutators::Commutator comm_type; MvHistory() = delete; - explicit MvHistory(object_type *obj) : MvHistory(obj, 0, nullptr) {} + explicit MvHistory(object_type *obj) + : MvHistory(obj, 0, nullptr) { + } explicit MvHistory( object_type *obj, tid_type ntid, const T& nv) - : obj_(obj), v_(nv), prev_(nullptr), - status_(PENDING), rtid_(ntid), wtid_(ntid), delete_cb(nullptr) { + : MvHistoryBase(obj, ntid, PENDING), + v_(nv), delete_cb(nullptr) { } explicit MvHistory( object_type *obj, tid_type ntid, T&& nv) - : obj_(obj), v_(std::move(nv)), prev_(nullptr), - status_(PENDING), rtid_(ntid), wtid_(ntid), delete_cb(nullptr) { + : MvHistoryBase(obj, ntid, PENDING), + v_(std::move(nv)), delete_cb(nullptr) { } explicit MvHistory( object_type *obj, tid_type ntid, T *nvp) - : obj_(obj), v_(nvp ? *nvp : v_), prev_(nullptr), - status_(PENDING), rtid_(ntid), wtid_(ntid), delete_cb(nullptr) { + : MvHistoryBase(obj, ntid, PENDING), + v_(nvp ? *nvp : v_ /*XXXXXXX*/), delete_cb(nullptr) { } explicit MvHistory( object_type *obj, tid_type ntid, comm_type &&c) - : obj_(obj), c_(std::move(c)), v_(), prev_(nullptr), - status_(PENDING_DELTA), rtid_(ntid), wtid_(ntid), delete_cb(nullptr) { + : MvHistoryBase(obj, ntid, PENDING_DELTA), + c_(std::move(c)), v_(), delete_cb(nullptr) { } // Whether this version can be late-inserted in front of hnext @@ -75,7 +99,7 @@ public: // Retrieve the object for which this history element is intended inline object_type* object() const { - return obj_; + return static_cast(obj_); } inline history_type* prev() const { @@ -88,6 +112,10 @@ public: } private: + inline history_type* prev_relaxed() const { + return reinterpret_cast(prev_.load(std::memory_order_relaxed)); + } + inline void update_rtid(tid_type minimum_new_rtid) { tid_type prev = rtid_.load(std::memory_order_relaxed); while (prev < minimum_new_rtid @@ -203,10 +231,10 @@ private: // `gc_committed_cb` are in the past). // EXCEPTION: Some nontransactional accesses (`v()`, `nontrans_*`) // ignore this protocol. - history_type* next = h->prev_.load(std::memory_order_relaxed); + history_type* next = h->prev_relaxed(); while (next) { h = next; - next = h->prev_.load(std::memory_order_relaxed); + next = h->prev_relaxed(); Transaction::rcu_call(gc_deleted_cb, h); int status = h->status(); assert(!(status & LOCKED)); @@ -304,14 +332,9 @@ private: } } - object_type * const obj_; // Parent object comm_type c_; T v_; - std::atomic prev_; - std::atomic status_; // Status of this element - std::atomic rtid_; // Read TID - tid_type wtid_; // Write TID void *index_ptr; void (*delete_cb) (void*, void*, void*); void *delete_param; @@ -352,22 +375,22 @@ public: #else MvObject() : h_(new history_type(this)) { if (std::is_trivial::value) { - h_.load()->v_ = T(); + head()->v_ = T(); /* XXXX */ } - h_.load()->status(COMMITTED_DELETED); + head()->status(COMMITTED_DELETED); } explicit MvObject(const T& value) : h_(new history_type(this, 0, value)) { - h_.load()->status(COMMITTED); + head()->status(COMMITTED); } explicit MvObject(T&& value) : h_(new history_type(this, 0, std::move(value))) { - h_.load()->status(COMMITTED); + head()->status(COMMITTED); } template explicit MvObject(Args&&... args) : h_(new history_type(this, 0, T(std::forward(args)...))) { - h_.load()->status(COMMITTED); + head()->status(COMMITTED); } #endif @@ -411,7 +434,7 @@ public: hr->update_rtid(tid); // Read version consistency check - for (history_type* h = h_.load(); h != hr; h = h->prev()) { + for (history_type* h = head(); h != hr; h = h->prev()) { if (!h->status_is(ABORTED) && h->wtid() < tid) { return false; } @@ -451,20 +474,21 @@ public: return false; } - std::atomic* target = &h_; + std::atomic* target = &h_; while (true) { // Discover target atomic on which to do CAS - history_type* t = *target; - if (t->wtid() > tid) { - if (may_commute && !hw->can_precede(t)) { + MvHistoryBase* t = *target; + if (t->wtid_ > tid) { + if (may_commute && !hw->can_precede(static_cast(t))) { return false; } target = &t->prev_; - } else if (!t->status_is(ABORTED) && t->rtid() > tid) { + } else if (!(t->status_.load(std::memory_order_relaxed) & ABORTED) + && t->rtid_.load(std::memory_order_relaxed) > tid) { return false; } else { // Properly link h's prev_ - hw->prev_.store(t, std::memory_order_relaxed); + hw->prev_.store(t, std::memory_order_release); // Attempt to CAS onto the target if (target->compare_exchange_strong(t, hw)) { @@ -475,7 +499,7 @@ public: // Write version consistency check for CU enabling if (may_commute && !hw->can_precede_anything()) { - for (history_type* h = h_.load(); h != hw; h = h->prev()) { + for (history_type* h = head(); h != hw; h = h->prev()) { if (!hw->can_precede(h)) { hw->status_abort(); return false; @@ -545,15 +569,14 @@ public: } history_type* head() const { - return h_.load(std::memory_order_acquire); + return reinterpret_cast(h_.load(std::memory_order_acquire)); } - - inline bool is_head(history_type* const h) const { - return h == h_; + bool is_head(const history_type* h) const { + return head() == h; } // Returns whether the given history element is the inlined version - inline bool is_inlined(history_type* const h) const { + inline bool is_inlined(const history_type* h) const { #if MVCC_INLINING return h == &ih_; #else @@ -581,7 +604,7 @@ public: // Read-only const T& nontrans_access() const { - history_type* h = h_; + history_type* h = head(); history_type* next = nullptr; /* TODO: head version caching */ while (h) { @@ -599,7 +622,7 @@ public: } // Writable version T& nontrans_access() { - history_type* h = h_; + history_type* h = head(); history_type* next = nullptr; /* TODO: head version caching */ while (h) { @@ -638,7 +661,7 @@ protected: } } - std::atomic h_; + std::atomic h_; std::atomic delta_counter = 0; // For gc-time flattening std::atomic flattenv_;