diff --git a/include/libipc/ipc.h b/include/libipc/ipc.h index 64b262c9..0b4459f6 100755 --- a/include/libipc/ipc.h +++ b/include/libipc/ipc.h @@ -31,8 +31,6 @@ struct IPC_EXPORT chan_impl { static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm); static buff_t recv(ipc::handle_t h, std::uint64_t tm); - - static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm); static buff_t try_recv(ipc::handle_t h); }; @@ -89,7 +87,7 @@ class chan_wrapper { } chan_wrapper clone() const { - return chan_wrapper { name(), mode_ }; + return chan_wrapper {name(), mode_}; } /** @@ -128,9 +126,6 @@ class chan_wrapper { return chan_wrapper(name).wait_for_recv(r_count, tm); } - /** - * If timeout, this function would call 'force_push' to send the data forcibly. - */ bool send(void const * data, std::size_t size, std::uint64_t tm = default_timeout) { return detail_t::send(h_, data, size, tm); } @@ -141,23 +136,9 @@ class chan_wrapper { return this->send(str.c_str(), str.size() + 1, tm); } - /** - * If timeout, this function would just return false. - */ - bool try_send(void const * data, std::size_t size, std::uint64_t tm = default_timeout) { - return detail_t::try_send(h_, data, size, tm); - } - bool try_send(buff_t const & buff, std::uint64_t tm = default_timeout) { - return this->try_send(buff.data(), buff.size(), tm); - } - bool try_send(std::string const & str, std::uint64_t tm = default_timeout) { - return this->try_send(str.c_str(), str.size() + 1, tm); - } - buff_t recv(std::uint64_t tm = invalid_value) { return detail_t::recv(h_, tm); } - buff_t try_recv() { return detail_t::try_recv(h_); } diff --git a/src/libipc/circ/elem_array.h b/src/libipc/circ/elem_array.h index 0b21f486..5febcba3 100755 --- a/src/libipc/circ/elem_array.h +++ b/src/libipc/circ/elem_array.h @@ -120,20 +120,15 @@ class elem_array : public ipc::circ::conn_head { return head_.cursor(); } - template - bool push(Q* que, F&& f) { - return head_.push(que, std::forward(f), block_); + template + bool push(F&& f) { + return head_.push(std::forward(f), block_); } - template - bool force_push(Q* que, F&& f) { - return head_.force_push(que, std::forward(f), block_); - } - - template - bool pop(Q* que, cursor_t* cur, F&& f, R&& out) { + template + bool pop(cursor_t* cur, F&& f) { if (cur == nullptr) return false; - return head_.pop(que, *cur, std::forward(f), std::forward(out), block_); + return head_.pop(*cur, std::forward(f), block_); } }; diff --git a/src/libipc/circ/elem_def.h b/src/libipc/circ/elem_def.h index 40039480..e14ef01a 100755 --- a/src/libipc/circ/elem_def.h +++ b/src/libipc/circ/elem_def.h @@ -50,40 +50,8 @@ class conn_head_base { } }; -template ::is_broadcast> -class conn_head; - -template -class conn_head : public conn_head_base { -public: - cc_t connect() noexcept { - for (unsigned k = 0;; ipc::yield(k)) { - cc_t curr = this->cc_.load(std::memory_order_acquire); - cc_t next = curr | (curr + 1); // find the first 0, and set it to 1. - if (next == 0) { - // connection-slot is full. - return 0; - } - if (this->cc_.compare_exchange_weak(curr, next, std::memory_order_release)) { - return next ^ curr; // return connected id - } - } - } - - cc_t disconnect(cc_t cc_id) noexcept { - return this->cc_.fetch_and(~cc_id, std::memory_order_acq_rel) & ~cc_id; - } - - std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept { - cc_t cur = this->cc_.load(order); - cc_t cnt; // accumulates the total bits set in cc - for (cnt = 0; cur; ++cnt) cur &= cur - 1; - return cnt; - } -}; - template -class conn_head : public conn_head_base { +class conn_head : public conn_head_base { public: cc_t connect() noexcept { return this->cc_.fetch_add(1, std::memory_order_relaxed) + 1; diff --git a/src/libipc/ipc.cpp b/src/libipc/ipc.cpp index c864a1b1..58d7979f 100755 --- a/src/libipc/ipc.cpp +++ b/src/libipc/ipc.cpp @@ -484,27 +484,6 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s } static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { - return send([tm](auto info, auto que, auto msg_id) { - return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { - if (!wait_for(info->wt_waiter_, [&] { - return !que->push( - [](void*) { return true; }, - info->cc_id_, msg_id, remain, data, size); - }, tm)) { - ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); - if (!que->force_push( - clear_message, - info->cc_id_, msg_id, remain, data, size)) { - return false; - } - } - info->rd_waiter_.broadcast(); - return true; - }; - }, h, data, size); -} - -static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { @@ -676,11 +655,6 @@ buff_t chan_impl::recv(ipc::handle_t h, std::uint64_t tm) { return detail_impl>::recv(h, tm); } -template -bool chan_impl::try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { - return detail_impl>::try_send(h, data, size, tm); -} - template buff_t chan_impl::try_recv(ipc::handle_t h) { return detail_impl>::try_recv(h); diff --git a/src/libipc/platform/posix/get_error.h b/src/libipc/platform/posix/get_error.h new file mode 100644 index 00000000..c552f226 --- /dev/null +++ b/src/libipc/platform/posix/get_error.h @@ -0,0 +1,14 @@ +#pragma once + +#include +#include + +namespace ipc { +namespace detail { + +inline char const *curr_error() noexcept { + return ::strerror(errno); +} + +} // namespace detail +} // namespace ipc diff --git a/src/libipc/platform/posix/semaphore_impl.h b/src/libipc/platform/posix/semaphore_impl.h index d48bcd4b..94a5350a 100644 --- a/src/libipc/platform/posix/semaphore_impl.h +++ b/src/libipc/platform/posix/semaphore_impl.h @@ -11,6 +11,7 @@ #include "libipc/shm.h" #include "get_wait_time.h" +#include "get_error.h" namespace ipc { namespace detail { @@ -40,7 +41,7 @@ class semaphore { } h_ = ::sem_open(name, O_CREAT, 0666, static_cast(count)); if (h_ == SEM_FAILED) { - ipc::error("fail sem_open[%d]: %s\n", errno, name); + ipc::error("fail sem_open[%s]: name = %s\n", curr_error(), name); return false; } return true; @@ -49,14 +50,14 @@ class semaphore { void close() noexcept { if (!valid()) return; if (::sem_close(h_) != 0) { - ipc::error("fail sem_close[%d]: %s\n", errno); + ipc::error("fail sem_close[%s]\n", curr_error()); } h_ = SEM_FAILED; if (shm_.name() != nullptr) { std::string name = shm_.name(); if (shm_.release() <= 1) { if (::sem_unlink(name.c_str()) != 0) { - ipc::error("fail sem_unlink[%d]: %s, name: %s\n", errno, name.c_str()); + ipc::error("fail sem_unlink[%s]: name = %s\n", curr_error(), name.c_str()); } } } @@ -66,15 +67,15 @@ class semaphore { if (!valid()) return false; if (tm == invalid_value) { if (::sem_wait(h_) != 0) { - ipc::error("fail sem_wait[%d]: %s\n", errno); + ipc::error("fail sem_wait[%s]\n", curr_error()); return false; } } else { auto ts = detail::make_timespec(tm); if (::sem_timedwait(h_, &ts) != 0) { if (errno != ETIMEDOUT) { - ipc::error("fail sem_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", - errno, tm, ts.tv_sec, ts.tv_nsec); + ipc::error("fail sem_timedwait[%s]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", + curr_error(), tm, ts.tv_sec, ts.tv_nsec); } return false; } @@ -86,7 +87,7 @@ class semaphore { if (!valid()) return false; for (std::uint32_t i = 0; i < count; ++i) { if (::sem_post(h_) != 0) { - ipc::error("fail sem_post[%d]: %s\n", errno); + ipc::error("fail sem_post[%s]\n", curr_error()); return false; } } diff --git a/src/libipc/platform/posix/shm_posix.cpp b/src/libipc/platform/posix/shm_posix.cpp index 7f70b070..9bf09ca0 100644 --- a/src/libipc/platform/posix/shm_posix.cpp +++ b/src/libipc/platform/posix/shm_posix.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include #include @@ -18,6 +17,8 @@ #include "libipc/utility/log.h" #include "libipc/memory/resource.h" +#include "get_error.h" + namespace { struct info_t { @@ -70,7 +71,7 @@ id_t acquire(char const * name, std::size_t size, unsigned mode) { S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); if (fd == -1) { - ipc::error("fail shm_open[%d]: %s\n", errno, name); + ipc::error("fail shm_open[%s]: name = %s\n", curr_error(), name); return nullptr; } auto ii = mem::alloc(); @@ -122,7 +123,7 @@ void * get_mem(id_t id, std::size_t * size) { if (ii->size_ == 0) { struct stat st; if (::fstat(fd, &st) != 0) { - ipc::error("fail fstat[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_); + ipc::error("fail fstat[%s]: name = %s, size = %zd\n", curr_error(), ii->name_.c_str(), ii->size_); return nullptr; } ii->size_ = static_cast(st.st_size); @@ -130,17 +131,16 @@ void * get_mem(id_t id, std::size_t * size) { ipc::error("fail get_mem: %s, invalid size = %zd\n", ii->name_.c_str(), ii->size_); return nullptr; } - } - else { + } else { ii->size_ = calc_size(ii->size_); if (::ftruncate(fd, static_cast(ii->size_)) != 0) { - ipc::error("fail ftruncate[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_); + ipc::error("fail ftruncate[%s]: name = %s, size = %zd\n", curr_error(), ii->name_.c_str(), ii->size_); return nullptr; } } void* mem = ::mmap(nullptr, ii->size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (mem == MAP_FAILED) { - ipc::error("fail mmap[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_); + ipc::error("fail mmap[%s]: name = %s, size = %zd\n", curr_error(), ii->name_.c_str(), ii->size_); return nullptr; } ::close(fd); diff --git a/src/libipc/prod_cons.h b/src/libipc/prod_cons.h index 28d99bda..1763209e 100755 --- a/src/libipc/prod_cons.h +++ b/src/libipc/prod_cons.h @@ -37,8 +37,8 @@ struct prod_cons_impl> { return 0; } - template - bool push(W* /*wrapper*/, F&& f, E* elems) { + template + bool push(F&& f, E* elems) { auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed)); if (cur_wt == circ::index_of(rd_.load(std::memory_order_acquire) - 1)) { return false; // full @@ -48,24 +48,13 @@ struct prod_cons_impl> { return true; } - /** - * In single-single-unicast, 'force_push' means 'no reader' or 'the only one reader is dead'. - * So we could just disconnect all connections of receiver, and return false. - */ - template - bool force_push(W* wrapper, F&&, E*) { - wrapper->elems()->disconnect_receiver(~static_cast(0u)); - return false; - } - - template - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { + template + bool pop(circ::u2_t& /*cur*/, F&& f, E* elems) { auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed)); if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) { return false; // empty } std::forward(f)(&(elems[cur_rd].data_)); - std::forward(out)(true); rd_.fetch_add(1, std::memory_order_release); return true; } @@ -75,26 +64,17 @@ template <> struct prod_cons_impl> : prod_cons_impl> { - template - bool force_push(W* wrapper, F&&, E*) { - wrapper->elems()->disconnect_receiver(1); - return false; - } - - template class E, std::size_t DS, std::size_t AS> - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { - byte_t buff[DS]; + bool pop(circ::u2_t& /*cur*/, F&& f, E* elems) { for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_relaxed); if (circ::index_of(cur_rd) == circ::index_of(wt_.load(std::memory_order_acquire))) { return false; // empty } - std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { - std::forward(f)(buff); - std::forward(out)(true); + std::forward(f)(&(elems[circ::index_of(cur_rd)].data_)); return true; } ipc::yield(k); @@ -116,8 +96,8 @@ struct prod_cons_impl> alignas(cache_line_size) std::atomic ct_; // commit index - template - bool push(W* /*wrapper*/, F&& f, E* elems) { + template + bool push(F&& f, E* elems) { circ::u2_t cur_ct, nxt_ct; for (unsigned k = 0;;) { cur_ct = ct_.load(std::memory_order_relaxed); @@ -153,16 +133,9 @@ struct prod_cons_impl> return true; } - template - bool force_push(W* wrapper, F&&, E*) { - wrapper->elems()->disconnect_receiver(1); - return false; - } - - template class E, std::size_t DS, std::size_t AS> - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { - byte_t buff[DS]; + bool pop(circ::u2_t& /*cur*/, F&& f, E* elems) { for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_relaxed); auto cur_wt = wt_.load(std::memory_order_acquire); @@ -179,15 +152,11 @@ struct prod_cons_impl> } k = 0; } - else { - std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); - if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { - std::forward(f)(buff); - std::forward(out)(true); - return true; - } - ipc::yield(k); + else if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { + std::forward(f)(&(elems[id_rd].data_)); + return true; } + ipc::yield(k); } } }; @@ -195,234 +164,147 @@ struct prod_cons_impl> template <> struct prod_cons_impl> { - using rc_t = std::uint64_t; - - enum : rc_t { - ep_mask = 0x00000000ffffffffull, - ep_incr = 0x0000000100000000ull - }; + using flag_t = std::uint64_t; template struct elem_t { std::aligned_storage_t data_ {}; - std::atomic rc_ { 0 }; // read-counter + std::atomic f_rc_ { 0 }; // read-flag }; alignas(cache_line_size) std::atomic wt_; // write index - alignas(cache_line_size) rc_t epoch_ { 0 }; // only one writer circ::u2_t cursor() const noexcept { return wt_.load(std::memory_order_acquire); } - template - bool push(W* wrapper, F&& f, E* elems) { - E* el; - for (unsigned k = 0;;) { - circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); - if (cc == 0) return false; // no reader - el = elems + circ::index_of(wt_.load(std::memory_order_relaxed)); - // check all consumers have finished reading this element - auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & ep_mask; - if ((cc & rem_cc) && ((cur_rc & ~ep_mask) == epoch_)) { - return false; // has not finished yet - } - // consider rem_cc to be 0 here - if (el->rc_.compare_exchange_weak( - cur_rc, epoch_ | static_cast(cc), std::memory_order_release)) { - break; - } - ipc::yield(k); + template + bool push(F&& f, E* elems) { + E* el = elems + circ::index_of(wt_.load(std::memory_order_relaxed)); + auto cur_rc = el->f_rc_.exchange(~0ull, std::memory_order_acq_rel); + // check for consumers to read this element + if (cur_rc != 0) { + return false; // full } std::forward(f)(&(el->data_)); wt_.fetch_add(1, std::memory_order_release); return true; } - template - bool force_push(W* wrapper, F&& f, E* elems) { - E* el; - epoch_ += ep_incr; - for (unsigned k = 0;;) { - circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); - if (cc == 0) return false; // no reader - el = elems + circ::index_of(wt_.load(std::memory_order_relaxed)); - // check all consumers have finished reading this element - auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & ep_mask; - if (cc & rem_cc) { - ipc::log("force_push: k = %u, cc = %u, rem_cc = %u\n", k, cc, rem_cc); - cc = wrapper->elems()->disconnect_receiver(rem_cc); // disconnect all invalid readers - if (cc == 0) return false; // no reader - } - // just compare & exchange - if (el->rc_.compare_exchange_weak( - cur_rc, epoch_ | static_cast(cc), std::memory_order_release)) { - break; - } - ipc::yield(k); - } + template + bool pop(circ::u2_t& cur, F&& f, E* elems) { + if (cur == cursor()) return false; // empty + E* el = elems + circ::index_of(cur++); std::forward(f)(&(el->data_)); - wt_.fetch_add(1, std::memory_order_release); + el->f_rc_.store(0, std::memory_order_release); return true; } - - template - bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E* elems) { - if (cur == cursor()) return false; // acquire - auto* el = elems + circ::index_of(cur++); - std::forward(f)(&(el->data_)); - for (unsigned k = 0;;) { - auto cur_rc = el->rc_.load(std::memory_order_acquire); - if ((cur_rc & ep_mask) == 0) { - std::forward(out)(true); - return true; - } - auto nxt_rc = cur_rc & ~static_cast(wrapper->connected_id()); - if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { - std::forward(out)((nxt_rc & ep_mask) == 0); - return true; - } - ipc::yield(k); - } - } }; template <> struct prod_cons_impl> { - using rc_t = std::uint64_t; using flag_t = std::uint64_t; - enum : rc_t { - rc_mask = 0x00000000ffffffffull, - ep_mask = 0x00ffffffffffffffull, - ep_incr = 0x0100000000000000ull, - ic_mask = 0xff000000ffffffffull, - ic_incr = 0x0000000100000000ull + enum : flag_t { + pushing = 1ull, + pushed = ~0ull, + popped = 0ull, }; template struct elem_t { std::aligned_storage_t data_ {}; - std::atomic rc_ { 0 }; // read-counter - std::atomic f_ct_ { 0 }; // commit flag + std::atomic f_rc_ { 0 }; // read-flag + std::atomic f_ct_ { 0 }; // commit-flag }; alignas(cache_line_size) std::atomic ct_; // commit index - alignas(cache_line_size) std::atomic epoch_ { 0 }; + alignas(cache_line_size) std::atomic wt_; // write index circ::u2_t cursor() const noexcept { - return ct_.load(std::memory_order_acquire); - } - - constexpr static rc_t inc_rc(rc_t rc) noexcept { - return (rc & ic_mask) | ((rc + ic_incr) & ~ic_mask); - } - - constexpr static rc_t inc_mask(rc_t rc) noexcept { - return inc_rc(rc) & ~rc_mask; + return wt_.load(std::memory_order_acquire); } - template - bool push(W* wrapper, F&& f, E* elems) { + template + bool push(F&& f, E* elems) { E* el; - circ::u2_t cur_ct; - rc_t epoch = epoch_.load(std::memory_order_acquire); + circ::u2_t cur_ct, nxt_ct; for (unsigned k = 0;;) { - circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); - if (cc == 0) return false; // no reader - el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); - // check all consumers have finished reading this element - auto cur_rc = el->rc_.load(std::memory_order_relaxed); - circ::cc_t rem_cc = cur_rc & rc_mask; - if ((cc & rem_cc) && ((cur_rc & ~ep_mask) == epoch)) { - return false; // has not finished yet - } - else if (!rem_cc) { - auto cur_fl = el->f_ct_.load(std::memory_order_acquire); - if ((cur_fl != cur_ct) && cur_fl) { - return false; // full + auto cac_ct = ct_.load(std::memory_order_relaxed); + cur_ct = cac_ct; + nxt_ct = cur_ct + 1; + el = elems + circ::index_of(cac_ct); + for (unsigned k = 0;;) { + auto cur_rc = el->f_rc_.load(std::memory_order_acquire); + switch (cur_rc) { + // helper + case pushing: + ct_.compare_exchange_strong(cac_ct, nxt_ct, std::memory_order_release); + goto try_next; + // full + case pushed: + return false; + // writable + default: + break; } + if (el->f_rc_.compare_exchange_weak(cur_rc, pushing, std::memory_order_release)) { + break; + } + ipc::yield(k); } - // consider rem_cc to be 0 here - if (el->rc_.compare_exchange_weak( - cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast(cc), std::memory_order_relaxed) && - epoch_.compare_exchange_weak(epoch, epoch, std::memory_order_acq_rel)) { - break; - } + ct_.compare_exchange_strong(cac_ct, nxt_ct, std::memory_order_relaxed); + el->f_rc_.store(pushed, std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_release); + break; + try_next: ipc::yield(k); } - // only one thread/process would touch here at one time - ct_.store(cur_ct + 1, std::memory_order_release); std::forward(f)(&(el->data_)); // set flag & try update wt el->f_ct_.store(~static_cast(cur_ct), std::memory_order_release); - return true; - } - - template - bool force_push(W* wrapper, F&& f, E* elems) { - E* el; - circ::u2_t cur_ct; - rc_t epoch = epoch_.fetch_add(ep_incr, std::memory_order_release) + ep_incr; - for (unsigned k = 0;;) { - circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); - if (cc == 0) return false; // no reader - el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); - // check all consumers have finished reading this element - auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & rc_mask; - if (cc & rem_cc) { - ipc::log("force_push: k = %u, cc = %u, rem_cc = %u\n", k, cc, rem_cc); - cc = wrapper->elems()->disconnect_receiver(rem_cc); // disconnect all invalid readers - if (cc == 0) return false; // no reader + while (1) { + auto cac_ct = el->f_ct_.load(std::memory_order_acquire); + if (cur_ct != wt_.load(std::memory_order_relaxed)) { + return true; } - // just compare & exchange - if (el->rc_.compare_exchange_weak( - cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast(cc), std::memory_order_relaxed)) { - if (epoch == epoch_.load(std::memory_order_acquire)) { - break; - } - else if (push(wrapper, std::forward(f), elems)) { - return true; - } - epoch = epoch_.fetch_add(ep_incr, std::memory_order_release) + ep_incr; + if ((~cac_ct) != cur_ct) { + return true; } - ipc::yield(k); + if (!el->f_ct_.compare_exchange_strong(cac_ct, 0, std::memory_order_relaxed)) { + return true; + } + wt_.store(nxt_ct, std::memory_order_release); + cur_ct = nxt_ct; + nxt_ct = cur_ct + 1; + el = elems + circ::index_of(cur_ct); } - // only one thread/process would touch here at one time - ct_.store(cur_ct + 1, std::memory_order_release); - std::forward(f)(&(el->data_)); - // set flag & try update wt - el->f_ct_.store(~static_cast(cur_ct), std::memory_order_release); return true; } - template - bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E(& elems)[N]) { - auto* el = elems + circ::index_of(cur); - auto cur_fl = el->f_ct_.load(std::memory_order_acquire); - if (cur_fl != ~static_cast(cur)) { - return false; // empty - } - ++cur; - std::forward(f)(&(el->data_)); + template + bool pop(circ::u2_t& cur, F&& f, E(& elems)[N]) { for (unsigned k = 0;;) { - auto cur_rc = el->rc_.load(std::memory_order_acquire); - if ((cur_rc & rc_mask) == 0) { - std::forward(out)(true); - el->f_ct_.store(cur + N - 1, std::memory_order_release); - return true; - } - auto nxt_rc = inc_rc(cur_rc) & ~static_cast(wrapper->connected_id()); - bool last_one = false; - if ((last_one = (nxt_rc & rc_mask) == 0)) { - el->f_ct_.store(cur + N - 1, std::memory_order_release); + auto cur_wt = wt_.load(std::memory_order_acquire); + auto id_rd = circ::index_of(cur); + auto id_wt = circ::index_of(cur_wt); + if (id_rd == id_wt) { + auto* el = elems + id_wt; + auto cac_ct = el->f_ct_.load(std::memory_order_acquire); + if ((~cac_ct) != cur_wt) { + return false; // empty + } + if (el->f_ct_.compare_exchange_weak(cac_ct, 0, std::memory_order_relaxed)) { + wt_.store(cur_wt + 1, std::memory_order_release); + } + k = 0; } - if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { - std::forward(out)(last_one); + else { + ++cur; + auto* el = elems + id_rd; + std::forward(f)(&(el->data_)); + el->f_rc_.store(popped, std::memory_order_release); return true; } ipc::yield(k); diff --git a/src/libipc/queue.h b/src/libipc/queue.h index f66860b8..5fe77a2e 100755 --- a/src/libipc/queue.h +++ b/src/libipc/queue.h @@ -158,27 +158,19 @@ class queue_base : public queue_conn { template bool push(F&& prep, P&&... params) { if (elems_ == nullptr) return false; - return elems_->push(this, [&](void* p) { + return elems_->push([&](void* p) { if (prep(p)) ::new (p) T(std::forward

(params)...); }); } - template - bool force_push(F&& prep, P&&... params) { - if (elems_ == nullptr) return false; - return elems_->force_push(this, [&](void* p) { - if (prep(p)) ::new (p) T(std::forward

(params)...); - }); - } - - template - bool pop(T& item, F&& out) { + template + bool pop(T& item) { if (elems_ == nullptr) { return false; } - return elems_->pop(this, &(this->cursor_), [&item](void* p) { + return elems_->pop(&(this->cursor_), [&item](void* p) { ::new (&item) T(std::move(*static_cast(p))); - }, std::forward(out)); + }); } }; @@ -198,18 +190,8 @@ class queue final : public detail::queue_base(std::forward

(params)...); } - template - bool force_push(P&&... params) { - return base_t::template force_push(std::forward

(params)...); - } - bool pop(T& item) { - return base_t::pop(item, [](bool) {}); - } - - template - bool pop(T& item, F&& out) { - return base_t::pop(item, std::forward(out)); + return base_t::pop(item); } }; diff --git a/src/libipc/waiter.h b/src/libipc/waiter.h index 2e13dc39..130bc984 100644 --- a/src/libipc/waiter.h +++ b/src/libipc/waiter.h @@ -62,12 +62,12 @@ class waiter { } bool notify() noexcept { - std::lock_guard{lock_}; // barrier + { IPC_UNUSED_ std::lock_guard guard {lock_}; } // barrier return cond_.notify(lock_); } bool broadcast() noexcept { - std::lock_guard{lock_}; // barrier + { IPC_UNUSED_ std::lock_guard guard {lock_}; } // barrier return cond_.broadcast(lock_); } diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index a1f01083..695e6743 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -73,11 +73,9 @@ void test_basic(char const * name) { que_t que1 { name }; EXPECT_FALSE(que1.send(test1)); - EXPECT_FALSE(que1.try_send(test2)); que_t que2 { que1.name(), ipc::receiver }; ASSERT_TRUE(que1.send(test1)); - ASSERT_TRUE(que1.try_send(test2)); EXPECT_EQ(que2.recv(), test1); EXPECT_EQ(que2.recv(), test2); diff --git a/test/test_queue.cpp b/test/test_queue.cpp index e85d39fa..1d2c7321 100755 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -5,7 +5,6 @@ #include #include #include -#include // CHAR_BIT #include "libipc/prod_cons.h" #include "libipc/policy.h" @@ -143,6 +142,8 @@ TEST(Queue, el_connection) { for (std::size_t i = 0; i < 10000; ++i) { ASSERT_TRUE(el.connect_sender()); } + el.disconnect_sender(); + EXPECT_TRUE(el.connect_sender()); } { elems_t el; @@ -156,12 +157,13 @@ TEST(Queue, el_connection) { } { elems_t el; - for (std::size_t i = 0; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { - ASSERT_NE(el.connect_receiver(), 0); - } + auto cc = el.connect_receiver(); + EXPECT_EQ(cc, 1); for (std::size_t i = 0; i < 10000; ++i) { - ASSERT_EQ(el.connect_receiver(), 0); + ASSERT_NE(el.connect_receiver(), 0); } + EXPECT_EQ(el.disconnect_receiver(cc), 10000); + EXPECT_EQ(el.connect_receiver(), 10000 + cc); } } @@ -227,25 +229,18 @@ TEST(Queue, connection) { for (std::size_t i = 0; i < 10000; ++i) { ASSERT_TRUE(que.connect()); } - for (std::size_t i = 1; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { - queue_t que{&el}; - ASSERT_TRUE(que.connect()); - } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - ASSERT_FALSE(que.connect()); + ASSERT_TRUE(que.connect()); } ASSERT_TRUE(que.disconnect()); for (std::size_t i = 0; i < 10000; ++i) { ASSERT_FALSE(que.disconnect()); } - { - queue_t que{&el}; - ASSERT_TRUE(que.connect()); - } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - ASSERT_FALSE(que.connect()); + ASSERT_TRUE(que.connect()); + ASSERT_TRUE(que.disconnect()); } } }