Skip to content

Commit faade8a

Browse files
committed
Cleaned up thread_pool so that I can read it more clearly.
1 parent 8df1f1c commit faade8a

File tree

2 files changed

+109
-113
lines changed

2 files changed

+109
-113
lines changed

concurrency/src/network/concurrency/thread_pool.hpp

+34-32
Original file line numberDiff line numberDiff line change
@@ -16,38 +16,40 @@
1616
#include <boost/asio/io_service.hpp>
1717

1818
namespace network {
19-
namespace concurrency {
20-
21-
typedef std::shared_ptr<boost::asio::io_service> io_service_ptr;
22-
typedef std::shared_ptr<std::vector<std::thread>> worker_threads_ptr;
23-
typedef std::shared_ptr<boost::asio::io_service::work> sentinel_ptr;
24-
25-
struct thread_pool_pimpl;
26-
27-
struct thread_pool {
28-
thread_pool(
29-
std::size_t threads = 1,
30-
io_service_ptr io_service = io_service_ptr(),
31-
std::vector<std::thread> worker_threads = std::vector<std::thread>());
32-
#if !defined(BOOST_NO_CXX11_DELETED_FUNCTIONS)
33-
thread_pool(thread_pool const&) = delete;
34-
#endif // !defined(BOOST_NO_CXX11_DELETEED_FUNCTIONS)
35-
thread_pool(thread_pool && other);
36-
#if !defined(BOOST_NO_CXX11_DELETED_FUNCTIONS)
37-
thread_pool& operator=(thread_pool const&) = delete;
38-
#endif // !defined(BOOST_NO_CXX11_DELETEED_FUNCTIONS)
39-
thread_pool& operator=(thread_pool && other);
40-
std::size_t const thread_count() const;
41-
void post(std::function<void()> f);
42-
~thread_pool();
43-
void swap(thread_pool& other);
44-
protected:
45-
thread_pool_pimpl* pimpl;
46-
};
47-
48-
inline void swap(thread_pool& l, thread_pool& r) { l.swap(r); }
49-
50-
} // namespace concurrency
19+
namespace concurrency {
20+
21+
typedef std::shared_ptr<boost::asio::io_service> io_service_ptr;
22+
typedef std::shared_ptr<std::vector<std::thread>> worker_threads_ptr;
23+
typedef std::shared_ptr<boost::asio::io_service::work> sentinel_ptr;
24+
25+
struct thread_pool {
26+
thread_pool(std::size_t threads = 1,
27+
io_service_ptr io_service = io_service_ptr(),
28+
std::vector<std::thread> worker_threads = std::vector<std::thread>());
29+
thread_pool(thread_pool const&) = delete;
30+
thread_pool(thread_pool && other);
31+
~thread_pool();
32+
33+
thread_pool& operator=(thread_pool const&) = delete;
34+
thread_pool& operator=(thread_pool && other);
35+
36+
std::size_t const thread_count() const;
37+
void post(std::function<void()> f);
38+
void swap(thread_pool& other);
39+
40+
private:
41+
42+
struct pimpl;
43+
pimpl* pimpl_;
44+
45+
};
46+
47+
inline
48+
void swap(thread_pool& l, thread_pool& r) {
49+
l.swap(r);
50+
}
51+
52+
} // namespace concurrency
5153
} // namespace network
5254

5355
#endif /* NETWORK_CONCURRENCY_THREAD_POOL_HPP_20101020 */

concurrency/src/network/concurrency/thread_pool.ipp

+75-81
Original file line numberDiff line numberDiff line change
@@ -14,99 +14,93 @@
1414
#include <boost/scope_exit.hpp>
1515

1616
namespace network {
17-
namespace concurrency {
18-
19-
struct thread_pool_pimpl {
20-
thread_pool_pimpl(
21-
std::size_t threads = 1,
22-
io_service_ptr io_service = io_service_ptr(),
23-
std::vector<std::thread> worker_threads = std::vector<std::thread>())
24-
: threads_(threads),
25-
io_service_(io_service),
26-
worker_threads_(std::move(worker_threads)),
27-
sentinel_() {
28-
bool commit = false;
29-
BOOST_SCOPE_EXIT((&commit)(&io_service_)(&worker_threads_)(&sentinel_)) {
30-
if (!commit) {
31-
sentinel_.reset();
32-
io_service_.reset();
33-
for (auto& thread : worker_threads_)
34-
if (thread.joinable())
35-
thread.join();
36-
worker_threads_.clear();
17+
namespace concurrency {
18+
19+
struct thread_pool::pimpl {
20+
pimpl(std::size_t threads = 1,
21+
io_service_ptr io_service = io_service_ptr(),
22+
std::vector<std::thread> worker_threads = std::vector<std::thread>())
23+
: threads_(threads),
24+
io_service_(io_service),
25+
worker_threads_(std::move(worker_threads)),
26+
sentinel_() {
27+
bool commit = false;
28+
29+
BOOST_SCOPE_EXIT((&commit)(&io_service_)(&worker_threads_)(&sentinel_)) {
30+
if (!commit) {
31+
sentinel_.reset();
32+
io_service_.reset();
33+
for (auto& thread : worker_threads_) {
34+
if (thread.joinable()) {
35+
thread.join();
36+
}
37+
}
38+
worker_threads_.clear();
39+
}
40+
}
41+
BOOST_SCOPE_EXIT_END
42+
43+
if (!io_service_.get()) {
44+
io_service_.reset(new boost::asio::io_service);
45+
}
46+
47+
if (!sentinel_.get()) {
48+
sentinel_.reset(new boost::asio::io_service::work(*io_service_));
49+
}
50+
51+
auto local_io_service = io_service_;
52+
for (std::size_t counter = 0; counter < threads_; ++counter) {
53+
worker_threads_.emplace_back([local_io_service]() {
54+
local_io_service->run();
55+
});
56+
}
57+
58+
commit = true;
3759
}
38-
}
39-
BOOST_SCOPE_EXIT_END if (!io_service_.get())
40-
io_service_.reset(new boost::asio::io_service);
41-
if (!sentinel_.get())
42-
sentinel_.reset(new boost::asio::io_service::work(*io_service_));
43-
auto local_io_service = io_service_;
44-
for (std::size_t counter = 0; counter < threads_; ++counter)
45-
worker_threads_.emplace_back([local_io_service]() {
46-
local_io_service->run();
47-
});
48-
49-
commit = true;
50-
}
5160

52-
#if !defined(BOOST_NO_CXX11_DEFAULTED_FUNCTIONS)
53-
thread_pool_pimpl(thread_pool_pimpl const&) = delete;
54-
thread_pool_pimpl& operator=(thread_pool_pimpl const&) = delete;
55-
#endif // !defined(BOOST_NO_CXX11_DEFAULTED_FUNCTIONS)
61+
~pimpl() {
62+
sentinel_.reset();
63+
try {
64+
for (auto& thread : worker_threads_)
65+
thread.join();
66+
}
67+
catch (...) {
68+
BOOST_ASSERT(false &&
69+
"A handler was not supposed to throw, but one did.");
70+
std::abort();
71+
}
72+
}
5673

57-
thread_pool_pimpl(thread_pool_pimpl && other) { other.swap(*this); }
74+
std::size_t threads_;
75+
io_service_ptr io_service_;
76+
std::vector<std::thread> worker_threads_;
77+
sentinel_ptr sentinel_;
5878

59-
std::size_t const thread_count() const { return threads_; }
79+
};
6080

61-
void post(std::function<void()> f) { io_service_->post(f); }
81+
thread_pool::thread_pool(std::size_t threads,
82+
io_service_ptr io_service,
83+
std::vector<std::thread> worker_threads)
84+
: pimpl_(new (std::nothrow)
85+
pimpl(threads, io_service, std::move(worker_threads))) {
6286

63-
~thread_pool_pimpl() {
64-
sentinel_.reset();
65-
try {
66-
for (auto& thread : worker_threads_)
67-
thread.join();
68-
}
69-
catch (...) {
70-
BOOST_ASSERT(false &&
71-
"A handler was not supposed to throw, but one did.");
72-
std::abort();
73-
}
7487
}
7588

76-
void swap(thread_pool_pimpl& other) {
77-
using std::swap;
78-
swap(other.threads_, threads_);
79-
swap(other.io_service_, io_service_);
80-
swap(other.worker_threads_, worker_threads_);
81-
swap(other.sentinel_, sentinel_);
89+
std::size_t const thread_pool::thread_count() const {
90+
return pimpl_->threads_;
8291
}
8392

84-
protected:
85-
std::size_t threads_;
86-
io_service_ptr io_service_;
87-
std::vector<std::thread> worker_threads_;
88-
sentinel_ptr sentinel_;
89-
};
90-
91-
thread_pool::thread_pool(std::size_t threads,
92-
io_service_ptr io_service,
93-
std::vector<std::thread> worker_threads)
94-
: pimpl(
95-
new (std::nothrow)
96-
thread_pool_pimpl(threads, io_service, std::move(worker_threads))) {}
97-
98-
std::size_t const thread_pool::thread_count() const {
99-
return pimpl->thread_count();
100-
}
101-
102-
void thread_pool::post(std::function<void()> f) { pimpl->post(f); }
103-
104-
void thread_pool::swap(thread_pool& other) {
105-
std::swap(other.pimpl, this->pimpl);
106-
}
93+
void thread_pool::post(std::function<void()> f) {
94+
pimpl_->io_service_->post(f);
95+
}
10796

108-
thread_pool::~thread_pool() { delete pimpl; }
97+
void thread_pool::swap(thread_pool& other) {
98+
std::swap(other.pimpl_, this->pimpl_);
99+
}
109100

101+
thread_pool::~thread_pool() {
102+
delete pimpl_;
103+
}
110104
} // namespace concurrency
111105
} // namespace network
112106

0 commit comments

Comments
 (0)