Skip to content

Commit

Permalink
add GlobalQueue implement
Browse files Browse the repository at this point in the history
  • Loading branch information
fs committed Mar 7, 2024
1 parent 8b175c6 commit d58a516
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 1 deletion.
41 changes: 40 additions & 1 deletion c++/async_task/src/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,49 @@ std::shared_ptr<Task> TaskQueue::try_deque() {
if (_queue.empty()) {
return nullptr;
}

auto task = _queue.front();
_queue.pop();
return task;
}

const std::shared_ptr<TaskQueue>& GlobalQueue::get_main_queue() {
static std::shared_ptr<TaskQueue> g_main_queue = std::make_shared<TaskQueue>("main_queue");
return g_main_queue;
}

std::shared_ptr<TaskQueue> GlobalQueue::get_queue_by_name(const std::string &queue_name){
auto it = _queue_map.find(queue_name);
if (it != _queue_map.end()) {
return it->second;
} else {
return nullptr;
}
}


// 获取默认的全局后台队列
std::shared_ptr<TaskQueue> GlobalQueue::get_background_queue() {
static std::shared_ptr<TaskQueue> background_queue = std::make_shared<TaskQueue>("background");
return background_queue;
}

void GlobalQueue::init_queues(const std::vector<std::string>& queue_names) {
get_main_queue();
get_background_queue();
for (auto& name : queue_names) {
init_queue(name);
}
}

bool GlobalQueue::init_queue(const std::string& queue_name) {
if(get_queue_by_name(queue_name) != nullptr) {
return false;
}
auto queue = std::make_shared<TaskQueue>(queue_name);
_queue_map[queue_name] = queue;
return true;

}

}
25 changes: 25 additions & 0 deletions c++/async_task/src/task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,31 @@ class TaskQueue : public std::enable_shared_from_this<TaskQueue> {
DISALLOW_COPY_AND_ASSIGN(TaskQueue);
};

class GlobalQueue {
public:
// 获取主队列
static const std::shared_ptr<TaskQueue>& get_main_queue();

static bool is_main_queue(const std::shared_ptr<TaskQueue>& queue) { return queue == get_main_queue(); }

// 根据名称获取子队列,如果队列不存在,则返回nullptr
static std::shared_ptr<TaskQueue> get_queue_by_name(const std::string &queue_name);

// 获取默认的全局后台队列
static std::shared_ptr<TaskQueue> get_background_queue();

// 初始化所有队列
static void init_queues(const std::vector<std::string>& queue_names);
// 初始化某个队列
static bool init_queue(const std::string& queue_name);

private:
GlobalQueue() {};
~GlobalQueue() {};
static std::map<std::string, std::shared_ptr<TaskQueue>> _queue_map;
DISALLOW_COPY_AND_ASSIGN(GlobalQueue);
};

}

#endif
49 changes: 49 additions & 0 deletions c++/async_task/src/work_thread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include "work_thread.h"
#include "work_queue.h"
#include "task.h"

#include <iostream>

namespace async_framework
{
std::thread::id WorkThread::_s_main_thread_id;

void WorkThread::main_loop() {
while (_running) {
process_one_task(true);
}
}

bool WorkThread::process_one_task(bool wait) {
std::shared_ptr<Task> task;
if (!wait) {
//非阻塞
task = _queue->try_deque( );
if (task == nullptr) {
return false;
}
} else {
// 获取任务,如果没有任务,则阻塞当前线程
task = _queue->dequeue();
}

if (task == nullptr) {
return false;
}
bool immediate = true;
task->execute(immediate);
return true;
}

void WorkThread::start(bool new_os_thread) {
_running .store(true);
if (new_os_thread) {
_thread = std::make_shared<std::thread>(&WorkThread::main_loop, this);
auto native_thread = _thread->native_handle();
pthread_setname_np(native_thread, _name.c_str());
} else {
main_loop();
}
}

} // namespace async_framework
50 changes: 50 additions & 0 deletions c++/async_task/src/work_thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#ifndef WORK_THREAD_H
#define WORK_THREAD_H

#include <string>
#include <memory>
#include <atomic>
#include <thread>

#include "utils/utils_define.h"
#include "task.h"
#include "task_queue.h"

namespace async_framework {

class WorkThread {
public:
WorkThread(const std::string &name, const std::shared_ptr<TaskQueue>& queue)
: _name(name)
, _queue(queue){};

virtual ~WorkThread();
void start(bool new_os_thread = true);

// 判断当前正在执行的线程是否是主线程
static bool is_main_thread() {
return _s_main_thread_id == std::this_thread::get_id();
}

static void set_main_thread_id(const std::thread::id& main_thread_id) {
_s_main_thread_id = main_thread_id;
}
protected:
// 从队列中获取一个任务并处理,返回是否获取到任务,wait表示如果没有任务是否阻塞当前线程
virtual bool process_one_task(bool wait = false);

// 线程的主循环, 默认实现为如果队列为空,则阻塞,否则处理任务
virtual void main_loop();
protected:
std::string _name;
std::shared_ptr<TaskQueue> _queue;
std::atomic<bool> _running;
std::shared_ptr<std::thread> _thread; //
static std::thread::id _s_main_thread_id; // _s_main_thread_id 所有对象共享,记录主线程id
private:
DISALLOW_COPY_AND_ASSIGN(WorkThread);
};

}

#endif

0 comments on commit d58a516

Please sign in to comment.