Skip to content

Commit

Permalink
WorkQueue: PointerWQ drain no longer waits for other queues
Browse files Browse the repository at this point in the history
If another (independent) queue was processing, drain could
block waiting.  Instead, allow drain to exit quickly if
no items are being processed and the queue is empty for
the current WQ.

Signed-off-by: Jason Dillaman <[email protected]>
  • Loading branch information
Jason Dillaman committed Nov 17, 2015
1 parent a3aa565 commit b118d7d
Showing 1 changed file with 19 additions and 1 deletion.
20 changes: 19 additions & 1 deletion src/common/WorkQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,20 +354,33 @@ class ThreadPool : public md_config_obs_t {
class PointerWQ : public WorkQueue_ {
public:
PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
: WorkQueue_(n, ti, sti), m_pool(p) {
: WorkQueue_(n, ti, sti), m_pool(p), m_processing(0) {
m_pool->add_work_queue(this);
}
~PointerWQ() {
m_pool->remove_work_queue(this);
assert(m_processing == 0);
}
void drain() {
{
// if this queue is empty and not processing, don't wait for other
// queues to finish processing
Mutex::Locker l(m_pool->_lock);
if (m_processing == 0 && m_items.empty()) {
return;
}
}
m_pool->drain(this);
}
void queue(T *item) {
Mutex::Locker l(m_pool->_lock);
m_items.push_back(item);
m_pool->_cond.SignalOne();
}
bool empty() {
Mutex::Locker l(m_pool->_lock);
return _empty();
}
protected:
virtual void _clear() {
assert(m_pool->_lock.is_locked());
Expand All @@ -383,6 +396,7 @@ class ThreadPool : public md_config_obs_t {
return NULL;
}

++m_processing;
T *item = m_items.front();
m_items.pop_front();
return item;
Expand All @@ -391,6 +405,9 @@ class ThreadPool : public md_config_obs_t {
process(reinterpret_cast<T *>(item));
}
virtual void _void_process_finish(void *item) {
assert(m_pool->_lock.is_locked());
assert(m_processing > 0);
--m_processing;
}

virtual void process(T *item) = 0;
Expand All @@ -409,6 +426,7 @@ class ThreadPool : public md_config_obs_t {
private:
ThreadPool *m_pool;
std::list<T *> m_items;
uint32_t m_processing;
};
private:
vector<WorkQueue_*> work_queues;
Expand Down

0 comments on commit b118d7d

Please sign in to comment.