Skip to content

Commit

Permalink
[bugfix] faster snd thd destroy function, 0 wait ts
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Nov 9, 2024
1 parent 126f611 commit 9bc7679
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 6 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ int main() {

[2024.10.27 - v2.6.2 - Chunel]
* 优化参数互斥机制
* 修复辅助线程异常等待问题,修改辅助线程使用场景
* 修复辅助线程异常等待问题
* 更新`tutorial`内容

</details>
Expand Down
28 changes: 27 additions & 1 deletion src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ class UAtomicQueue : public UQueueObject {
*/
std::unique_ptr<T> popWithTimeout(CMSec ms) {
CGRAPH_UNIQUE_LOCK lk(mutex_);
if (!cv_.wait_for(lk, std::chrono::milliseconds(ms), [this] { return !queue_.empty(); })) {
if (!cv_.wait_for(lk, std::chrono::milliseconds(ms),
[this] { return (!queue_.empty()) || (!ready_flag_); })) {
return nullptr;
}

if (queue_.empty() || !ready_flag_) {
return nullptr;
}

Expand Down Expand Up @@ -135,10 +140,31 @@ class UAtomicQueue : public UQueueObject {
return queue_.empty();
}


/**
* 功能是通知所有的辅助线程停止工作
* @return
*/
CVoid reset() {
ready_flag_ = false;
cv_.notify_all();
}


/**
* 初始化状态
* @return
*/
CVoid setup() {
ready_flag_ = true;
queue_ = {};
}

CGRAPH_NO_ALLOWED_COPY(UAtomicQueue)

private:
std::queue<std::unique_ptr<T>> queue_ {}; // 任务队列
CBool ready_flag_ { true }; // 执行标记,主要用于快速释放 destroy 逻辑中,多个辅助线程等待的状态
};

CGRAPH_NAMESPACE_END
Expand Down
8 changes: 5 additions & 3 deletions src/UtilsCtrl/ThreadPool/UThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ CStatus UThreadPool::init() {
}
thread_record_map_.clear();
thread_record_map_[(CSize)std::hash<std::thread::id>{}(std::this_thread::get_id())] = CGRAPH_MAIN_THREAD_ID;
task_queue_.setup();
primary_threads_.reserve(config_.default_thread_size_);
for (int i = 0; i < config_.default_thread_size_; i++) {
auto* pt = CGRAPH_SAFE_MALLOC_COBJECT(UThreadPrimary); // 创建核心线程数
Expand All @@ -70,8 +71,8 @@ CStatus UThreadPool::init() {
* 参考: https://github.com/ChunelFeng/CGraph/issues/309
*/
for (int i = 0; i < config_.default_thread_size_; i++) {
status += primary_threads_[i]->init();
thread_record_map_[(CSize)std::hash<std::thread::id>{}(primary_threads_[i]->thread_.get_id())] = i;
status += primary_threads_[i]->init();
thread_record_map_[(CSize)std::hash<std::thread::id>{}(primary_threads_[i]->thread_.get_id())] = i;
}
CGRAPH_FUNCTION_CHECK_STATUS

Expand Down Expand Up @@ -162,6 +163,7 @@ CStatus UThreadPool::destroy() {
primary_threads_.clear();

// secondary 线程是智能指针,不需要delete
task_queue_.reset();
for (auto &st : secondary_threads_) {
status += st->destroy();
}
Expand Down Expand Up @@ -206,7 +208,7 @@ CIndex UThreadPool::dispatch(CIndex origIndex) {
CIndex realIndex = 0;
if (CGRAPH_DEFAULT_TASK_STRATEGY == origIndex) {
realIndex = cur_index_++;
if (cur_index_ >= config_.default_thread_size_ || cur_index_ < 0) {
if (cur_index_ >= config_.max_thread_size_ || cur_index_ < 0) {
cur_index_ = 0;
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ static const CMSec CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL = 1000;
static const CSec CGRAPH_SECONDARY_THREAD_TTL = 10; // 辅助线程ttl,单位为s
static const CBool CGRAPH_MONITOR_ENABLE = false; // 是否开启监控程序
static const CSec CGRAPH_MONITOR_SPAN = 5; // 监控线程执行间隔,单位为s
static const CMSec CGRAPH_QUEUE_EMPTY_INTERVAL = 50; // 队列为空时,等待的时间。仅针对辅助线程,单位为ms
static const CMSec CGRAPH_QUEUE_EMPTY_INTERVAL = 1000; // 队列为空时,等待的时间。仅针对辅助线程,单位为ms
static const CBool CGRAPH_BIND_CPU_ENABLE = false; // 是否开启绑定cpu模式(仅针对主线程)
static const CInt CGRAPH_PRIMARY_THREAD_POLICY = CGRAPH_THREAD_SCHED_OTHER; // 主线程调度策略
static const CInt CGRAPH_SECONDARY_THREAD_POLICY = CGRAPH_THREAD_SCHED_OTHER; // 辅助线程调度策略
Expand Down

0 comments on commit 9bc7679

Please sign in to comment.