Skip to content

Commit

Permalink
Added support for timeouts in blocking version of queue (see issues c…
Browse files Browse the repository at this point in the history
  • Loading branch information
cameron314 committed May 11, 2016
1 parent 9fa0b22 commit 0da535c
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 30 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,27 @@ front = q.peek();
assert(front == nullptr); // Returns nullptr if the queue was empty
```
The blocking version has the exact same API, with the addition of a `wait_dequeue` method:
The blocking version has the exact same API, with the addition of `wait_dequeue` and
`wait_dequeue_timed` methods:
```cpp
BlockingReaderWriterQueue<int> q;
std::thread reader([&]() {
int item;
for (int i = 0; i != 100; ++i) {
// Fully-blocking:
q.wait_dequeue(item);
// Blocking with timeout
if (q.wait_dequeue_timed(item, std::chrono::milliseconds(5)))
++i;
}
});
std::thread writer([&]() {
for (int i = 0; i != 100; ++i) {
q.enqueue(i);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
});
writer.join();
Expand Down
99 changes: 92 additions & 7 deletions atomicops.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ©2013-2015 Cameron Desrochers.
// ©2013-2016 Cameron Desrochers.
// Distributed under the simplified BSD license (see the license file that
// should have come with this header).
// Uses Jeff Preshing's semaphore implementation (under the terms of its
Expand Down Expand Up @@ -393,6 +393,18 @@ namespace moodycamel
WaitForSingleObject(m_hSema, infinite);
}

bool try_wait()
{
const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
return WaitForSingleObject(m_hSema, 0) != RC_WAIT_TIMEOUT;
}

bool timed_wait(std::uint64_t usecs)
{
const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) != RC_WAIT_TIMEOUT;
}

void signal(int count = 1)
{
ReleaseSemaphore(m_hSema, count, nullptr);
Expand Down Expand Up @@ -428,6 +440,23 @@ namespace moodycamel
semaphore_wait(m_sema);
}

bool try_wait()
{
return timed_wait(0);
}

bool timed_wait(std::int64_t timeout_usecs)
{
mach_timespec_t ts;
ts.tv_sec = timeout_usecs / 1000000;
ts.tv_nsec = (timeout_usecs % 1000000) * 1000;

// added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
kern_return_t rc = semaphore_timedwait(m_sema, ts);

return rc != KERN_OPERATION_TIMED_OUT;
}

void signal()
{
semaphore_signal(m_sema);
Expand Down Expand Up @@ -476,6 +505,37 @@ namespace moodycamel
while (rc == -1 && errno == EINTR);
}

bool try_wait()
{
int rc;
do {
rc = sem_trywait(&m_sema);
} while (rc == -1 && errno == EINTR);
return !(rc == -1 && errno == EAGAIN);
}

bool timed_wait(std::uint64_t usecs)
{
struct timespec ts;
const int usecs_in_1_sec = 1000000;
const int nsecs_in_1_sec = 1000000000;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += usecs / usecs_in_1_sec;
ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000;
// sem_timedwait bombs if you have more than 1e9 in tv_nsec
// so we have to clean things up before passing it in
if (ts.tv_nsec > nsecs_in_1_sec) {
ts.tv_nsec -= nsecs_in_1_sec;
++ts.tv_sec;
}

int rc;
do {
rc = sem_timedwait(&m_sema, &ts);
} while (rc == -1 && errno == EINTR);
return !(rc == -1 && errno == ETIMEDOUT);
}

void signal()
{
sem_post(&m_sema);
Expand Down Expand Up @@ -505,7 +565,7 @@ namespace moodycamel
weak_atomic<ssize_t> m_count;
Semaphore m_sema;

void waitWithPartialSpinning()
bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
{
ssize_t oldCount;
// Is there a better way to set the initial spin count?
Expand All @@ -517,15 +577,35 @@ namespace moodycamel
if (m_count.load() > 0)
{
m_count.fetch_add_acquire(-1);
return;
return true;
}
compiler_fence(memory_order_acquire); // Prevent the compiler from collapsing the loop.
}
oldCount = m_count.fetch_add_acquire(-1);
if (oldCount <= 0)
{
m_sema.wait();
}
if (oldCount > 0)
return true;
if (timeout_usecs < 0)
{
m_sema.wait();
return true;
}
if (m_sema.timed_wait(timeout_usecs))
return true;
// At this point, we've timed out waiting for the semaphore, but the
// count is still decremented indicating we may still be waiting on
// it. So we have to re-adjust the count, but only if the semaphore
// wasn't signaled enough times for us too since then. If it was, we
// need to release the semaphore too.
while (true)
{
oldCount = m_count.fetch_add_release(1);
if (oldCount < 0)
return false; // successfully restored things to the way they were
// Oh, the producer thread just signaled the semaphore after all. Try again:
oldCount = m_count.fetch_add_acquire(-1);
if (oldCount >= 0 && m_sema.try_wait())
return true;
}
}

public:
Expand All @@ -550,6 +630,11 @@ namespace moodycamel
waitWithPartialSpinning();
}

bool wait(std::int64_t timeout_usecs)
{
return tryWait() || waitWithPartialSpinning(timeout_usecs);
}

void signal(ssize_t count = 1)
{
assert(count >= 0);
Expand Down
38 changes: 38 additions & 0 deletions readerwriterqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
#include <stdexcept>
#include <cstdint>
#include <cstdlib> // For malloc/free/abort & size_t
#if __cplusplus > 199711L
#include <chrono>
#endif


// A lock-free queue for a single-consumer, single-producer architecture.
Expand Down Expand Up @@ -727,6 +730,41 @@ class BlockingReaderWriterQueue
}


// Attempts to dequeue an element; if the queue is empty,
// waits until an element is available up to the specified timeout,
// then dequeues it and returns true, or returns false if the timeout
// expires before an element can be dequeued.
// Using a negative timeout indicates an indefinite timeout,
// and is thus functionally equivalent to calling wait_dequeue.
template<typename U>
bool wait_dequeue_timed(U& result, std::int64_t timeout_usecs)
{
if (!sema.wait(timeout_usecs)) {
return false;
}
bool success = inner.try_dequeue(result);
AE_UNUSED(result);
assert(success);
AE_UNUSED(success);
return true;
}


#if __cplusplus > 199711L
// Attempts to dequeue an element; if the queue is empty,
// waits until an element is available up to the specified timeout,
// then dequeues it and returns true, or returns false if the timeout
// expires before an element can be dequeued.
// Using a negative timeout indicates an indefinite timeout,
// and is thus functionally equivalent to calling wait_dequeue.
template<typename U, typename Rep, typename Period>
inline bool wait_dequeue_timed(U& result, std::chrono::duration<Rep, Period> const& timeout)
{
return wait_dequeue_timed(result, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
}
#endif


// Returns a pointer to the front element in the queue (the one that
// would be removed next by a call to `try_dequeue` or `pop`). If the
// queue appears empty at the time the method is called, nullptr is
Expand Down
80 changes: 58 additions & 22 deletions tests/unittests/unittests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,30 +456,66 @@ class ReaderWriterQueueTests : public TestClass<ReaderWriterQueueTests>
weak_atomic<int> result;
result = 1;

BlockingReaderWriterQueue<int> q(100);
SimpleThread reader([&]() {
int item = -1;
int prevItem = -1;
for (int i = 0; i != 1000000; ++i) {
q.wait_dequeue(item);
if (item <= prevItem) {
result = 0;
{
BlockingReaderWriterQueue<int> q(100);
SimpleThread reader([&]() {
int item = -1;
int prevItem = -1;
for (int i = 0; i != 1000000; ++i) {
q.wait_dequeue(item);
if (item <= prevItem) {
result = 0;
}
prevItem = item;
}
prevItem = item;
}
});
SimpleThread writer([&]() {
for (int i = 0; i != 1000000; ++i) {
q.enqueue(i);
}
});

writer.join();
reader.join();

ASSERT_OR_FAIL(q.size_approx() == 0);
});
SimpleThread writer([&]() {
for (int i = 0; i != 1000000; ++i) {
q.enqueue(i);
}
});

writer.join();
reader.join();

ASSERT_OR_FAIL(q.size_approx() == 0);
ASSERT_OR_FAIL(result.load());
}

{
BlockingReaderWriterQueue<int> q(100);
SimpleThread reader([&]() {
int item = -1;
int prevItem = -1;
for (int i = 0; i != 1000000; ++i) {
if (!q.wait_dequeue_timed(item, 1000)) {
--i;
continue;
}
if (item <= prevItem) {
result = 0;
}
prevItem = item;
}
});
SimpleThread writer([&]() {
for (int i = 0; i != 1000000; ++i) {
q.enqueue(i);
for (volatile int x = 0; x != 100; ++x);
}
});

writer.join();
reader.join();

int item;
ASSERT_OR_FAIL(q.size_approx() == 0);
ASSERT_OR_FAIL(!q.wait_dequeue_timed(item, 0));
ASSERT_OR_FAIL(!q.wait_dequeue_timed(item, 1));
ASSERT_OR_FAIL(result.load());
}

return result.load() ? 1 : 0;
return true;
}
};

Expand Down

0 comments on commit 0da535c

Please sign in to comment.