Skip to content

Commit

Permalink
Merge pull request yuwenyong#20 from yuwenyong/dev
Browse files Browse the repository at this point in the history
v3.3.0
  • Loading branch information
yuwenyong authored Jan 6, 2019
2 parents 3dd18b6 + 12a948f commit d0bf54c
Show file tree
Hide file tree
Showing 8 changed files with 433 additions and 32 deletions.
119 changes: 89 additions & 30 deletions example/helloworld/helloworld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,36 +164,95 @@ class HelloWorldApp: public AppBootstrapper {
int main (int argc, char **argv) {
// HelloWorldApp app;
// app.run(argc, argv);
Person p1{"testName", 21, "M", 167.5};
Person p2;

Archive<> a1;
Archive<ByteOrderNetwork> a2;
Archive<ByteOrderBigEndian> a3;
Archive<ByteOrderLittleEndian> a4;

std::cout << "Archive test start" << std::endl;
p1.display();

std::cout << "Archive Native" << std::endl;
a1 << p1;
a1 >> p2;
p2.display();

std::cout << "Archive Network" << std::endl;
a2 << p1;
a2 >> p2;
p2.display();

std::cout << "Archive BE" << std::endl;
a3 << p1;
a3 >> p2;
p2.display();

std::cout << "Archive LE" << std::endl;
a4 << p1;
a4 >> p2;
p2.display();

TaskPool taskPool;
taskPool.start(4);
std::cerr << "Started" << std::endl;
auto f1 = taskPool.submit([]() {
std::cerr << "First task" << std::endl;
std::cerr << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds{5});
std::cerr << "First task completed" << std::endl;
return 17;
});

auto f2 = taskPool.submit([]() {
std::cerr << "Second task" << std::endl;
std::cerr << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds{2});
std::cerr << "Second task completed" << std::endl;
return 18.1;
});

auto f3 = taskPool.submit([]() {
std::cerr << "Third task" << std::endl;
std::cerr << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds{3});
std::cerr << "Third task completed" << std::endl;
return std::string{"abc"};
});

auto f4 = taskPool.submit([]() {
std::cerr << "Forth task" << std::endl;
std::cerr << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds{1});
std::cerr << "Forth task completed" << std::endl;
return 4;
});

auto f5 = taskPool.submit([]() {
std::cerr << "Fifth task" << std::endl;
std::cerr << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds{3});
std::cerr << "Fifth task completed" << std::endl;
return 5;
});

auto val1 = f1.get();
auto val2 = f2.get();
auto val3 = f3.get();
auto val4 = f4.get();
auto val5 = f5.get();
std::cerr << "First result:" << val1 << std::endl;
std::cerr << "Second result:" << val2 << std::endl;
std::cerr << "Third result:" << val3 << std::endl;
std::cerr << "Forth result:" << val4 << std::endl;
std::cerr << "Fifth result:" << val5 << std::endl;

taskPool.stop();
taskPool.wait();
std::cerr << "Completed" << std::endl;

// Person p1{"testName", 21, "M", 167.5};
// Person p2;
//
// Archive<> a1;
// Archive<ByteOrderNetwork> a2;
// Archive<ByteOrderBigEndian> a3;
// Archive<ByteOrderLittleEndian> a4;
//
// std::cout << "Archive test start" << std::endl;
// p1.display();
//
// std::cout << "Archive Native" << std::endl;
// a1 << p1;
// a1 >> p2;
// p2.display();
//
// std::cout << "Archive Network" << std::endl;
// a2 << p1;
// a2 >> p2;
// p2.display();
//
// std::cout << "Archive BE" << std::endl;
// a3 << p1;
// a3 >> p2;
// p2.display();
//
// std::cout << "Archive LE" << std::endl;
// a4 << p1;
// a4 >> p2;
// p2.display();

// auto d1 = makeDeferred();
// d1->addCallback([](DeferredValue value) {
Expand Down
2 changes: 1 addition & 1 deletion src/net4cxx/common/define.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ NS_END
#endif


#define NET4CXX_VERSION "3.2.1"
#define NET4CXX_VERSION "3.3.0"
#define NET4CXX_VER "net4cxx/" NET4CXX_VERSION


Expand Down
10 changes: 10 additions & 0 deletions src/net4cxx/common/threading/concurrentqueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
//
// Created by yuwenyong.vincent on 2019-01-06.
//

#include "net4cxx/common/threading/concurrentqueue.h"


NS_BEGIN

NS_END
137 changes: 137 additions & 0 deletions src/net4cxx/common/threading/concurrentqueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//
// Created by yuwenyong.vincent on 2019-01-06.
//

#ifndef NET4CXX_COMMON_THREADING_CONCURRENTQUEUE_H
#define NET4CXX_COMMON_THREADING_CONCURRENTQUEUE_H

#include "net4cxx/common/common.h"


NS_BEGIN


template <typename ValueT>
class ConcurrentQueue {
public:
bool empty() const {
std::lock_guard<std::mutex> lock(_mut);
return _queue.empty();
}

size_t size() const {
std::lock_guard<std::mutex> lock(_mut);
return _queue.size();
}

bool enqueue(ValueT &&val) {
std::lock_guard<std::mutex> lock(_mut);
if (_terminated || _stopped) {
return false;
}
_queue.emplace_back(std::forward<ValueT>(val));
_cond.notify_one();
return true;
}

bool dequeue(ValueT &val) {
bool res{false};
std::unique_lock<std::mutex> lock(_mut);
do {
if (_terminated) {
break;
} else if (!_queue.empty()) {
val = std::move(_queue.front());
_queue.pop_front();
res = true;
break;
} else if (_stopped) {
break;
}
_cond.wait(lock);
} while (true);
return res;
}

std::shared_ptr<ValueT> dequeue() {
std::shared_ptr<ValueT> res;
std::unique_lock<std::mutex> lock(_mut);
do {
if (_terminated) {
break;
} else if (!_queue.empty()) {
res = std::make_shared<ValueT>(std::move(_queue.front()));
_queue.pop_front();
break;
} else if (_stopped) {
break;
}
_cond.wait(lock);
} while (true);
return res;
}

bool tryDequeue(ValueT &val) {
std::lock_guard<std::mutex> lock(_mut);
if (_terminated) {
return false;
}
if (!_queue.empty()) {
val = std::move(_queue.front());
_queue.pop_front();
return true;
}
return false;
}

std::shared_ptr<ValueT> tryDequeue() {
std::lock_guard<std::mutex> lock(_mut);
if (_terminated) {
return nullptr;
}
if (!_queue.empty()) {
auto res = std::make_shared<ValueT>(std::move(_queue.front()));
_queue.pop_front();
return res;
}
return nullptr;
}

void stop() {
std::lock_guard<std::mutex> lock(_mut);
_stopped = true;
_cond.notify_all();
}

bool stopped() const {
std::lock_guard<std::mutex> lock(_mut);
return _stopped;
}

void terminate() {
std::lock_guard<std::mutex> lock(_mut);
_terminated = true;
_cond.notify_all();
}

bool terminated() const {
std::lock_guard<std::mutex> lock(_mut);
return _terminated;
}

void reset() {
std::lock_guard<std::mutex> lock(_mut);
_stopped = false;
_terminated = false;
}
protected:
mutable std::mutex _mut;
std::condition_variable _cond;
std::deque<ValueT> _queue;
bool _stopped{false};
bool _terminated{false};
};

NS_END

#endif //NET4CXX_COMMON_THREADING_CONCURRENTQUEUE_H
64 changes: 64 additions & 0 deletions src/net4cxx/common/threading/taskpool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//
// Created by yuwenyong.vincent on 2019-01-06.
//

#include "net4cxx/common/threading/taskpool.h"


NS_BEGIN

bool TaskPool::start(size_t threadCount) {
if (threadCount == 0) {
return false;
}
if (!_threads.empty()) {
return false;
}
if (_stopped || _terminated) {
return false;
}
for (size_t i = 0; i != threadCount; ++i) {
_threads.emplace_back(std::thread([this](){
process();
}));
}
return true;
}

void TaskPool::process() {
std::unique_ptr<TaskBase> task;
while (true) {
task = pop();
if (!task) {
break;
}
task->execute();
task.reset();
}
}

std::unique_ptr<TaskPool::TaskBase> TaskPool::pop() {
std::unique_ptr<TaskBase> res;
std::unique_lock<std::mutex> lock(_mut);
do {
if (_terminated) {
break;
} else if (!_taskList.empty()) {
res.reset(&_taskList.front());
_taskList.pop_front();
break;
} else if (_stopped) {
break;
}
_cond.wait(lock);
} while (true);
return res;
}

void TaskPool::push(std::unique_ptr<TaskBase> &&task) {
std::lock_guard<std::mutex> lock(_mut);
_taskList.push_back(*task.release());
_cond.notify_one();
}

NS_END
Loading

0 comments on commit d0bf54c

Please sign in to comment.