Skip to content

Commit

Permalink
add TaskQueue implement of multi-thread version2
Browse files Browse the repository at this point in the history
  • Loading branch information
fs committed Mar 7, 2024
1 parent 08815af commit 8b175c6
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 0 deletions.
42 changes: 42 additions & 0 deletions c++/async_task/src/task_queue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include "task_queue.h"

#include <thread>
#include <iostream>

namespace async_framework {

void TaskQueue::enqueue(std::shared_ptr<Task> task) {
std::unique_lock<std::mutex> lock(_queue_mutex);
_queue.emplace(task);
task->set_work_queue(this->shared_from_this());
_queue_not_empty.notify_one();// 通知隊列不空
}

std::shared_ptr<Task> TaskQueue::dequeue() {
std::unique_lock<std::mutex> lock(_queue_mutex);

// while(_queue.empty()) {
// _queue_not_empty.wait(lock);
// }

_queue_not_empty.wait(lock, [this] {
return !_queue.empty();
});

auto task = _queue.front();
_queue.pop();
return task;
}

std::shared_ptr<Task> TaskQueue::try_deque() {
std::unique_lock<std::mutex> lock(_queue_mutex);
if (_queue.empty()) {
return nullptr;
}

auto task = _queue.front();
_queue.pop();
return task;
}

}
44 changes: 44 additions & 0 deletions c++/async_task/src/task_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#ifndef TASK_QUEUE_H
#define TASK_QUEUE_H

#include <string>
#include <memory>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <map>

#include "utils/utils_define.h"
#include "task.h"

namespace async_framework {

class TaskQueue : public std::enable_shared_from_this<TaskQueue> {
public:
TaskQueue(const std::string& queue_name) : _queue_name(queue_name) {}
~TaskQueue() = default;

// 任务入队
void enqueue(std::shared_ptr<Task> task);

// 任务出队,如果没有任务则阻塞
std::shared_ptr<Task> dequeue();

// 非阻塞版本, 如果队列为空,则返回nullptr
std::shared_ptr<Task> try_deque();

const std::string& get_queue_name() const { return _queue_name; }
private:
std::string _queue_name;

std::queue<std::shared_ptr<Task>> _queue;

std::mutex _queue_mutex;
std::condition_variable _queue_not_empty;

DISALLOW_COPY_AND_ASSIGN(TaskQueue);
};

}

#endif

0 comments on commit 8b175c6

Please sign in to comment.