-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdriver_impl.h
106 lines (84 loc) · 2.88 KB
/
driver_impl.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#ifndef SOCKPUPPET_DRIVER_IMPL_H
#define SOCKPUPPET_DRIVER_IMPL_H
#include "socket_impl.h" // for SocketImpl
#include "sockpuppet/address.h" // for Address
#include "sockpuppet/socket_async.h" // for Driver
#include "todo_impl.h" // for ToDos
#ifdef _WIN32
# include <winsock2.h> // for pollfd
#else
# include <poll.h> // for pollfd
#endif // _WIN32
#include <atomic> // for std::atomic
#include <functional> // for std::reference_wrapper
#include <memory> // for std::shared_ptr
#include <mutex> // for std::mutex
#include <vector> // for std::vector
namespace sockpuppet {
struct Driver::DriverImpl
{
using AddressShared = std::shared_ptr<Address::AddressImpl>;
using SocketRef = std::reference_wrapper<SocketAsyncImpl>;
// StepGuard and StopGuard perform a handshake to obtain stepMtx
// with pauseMtx used to force Step() to yield
struct StepGuard
{
std::unique_lock<std::recursive_mutex> stepLock;
std::unique_lock<std::mutex> pauseLock;
StepGuard(DriverImpl &impl);
StepGuard(StepGuard const &) = delete;
StepGuard(StepGuard &&) = delete;
~StepGuard();
StepGuard &operator=(StepGuard const &) = delete;
StepGuard &operator=(StepGuard &&) = delete;
};
struct PauseGuard
{
std::unique_lock<std::recursive_mutex> stepLock;
PauseGuard(DriverImpl &impl);
PauseGuard(PauseGuard const &) = delete;
PauseGuard(PauseGuard &&) = delete;
~PauseGuard();
PauseGuard &operator=(PauseGuard const &) = delete;
PauseGuard &operator=(PauseGuard &&) = delete;
};
/// Internal signalling pipe for cancelling Step()
AddressShared pipeToAddr;
SocketImpl pipeFrom;
SocketImpl pipeTo;
/// Lists of managed ToDos and Sockets protected by mutex
std::recursive_mutex stepMtx;
std::mutex pauseMtx;
ToDos todos; // guarded by stepMtx
std::vector<SocketRef> sockets; // guarded by stepMtx
std::vector<pollfd> pfds; // front element belongs to internal signalling pipe; guarded by stepMtx
std::atomic<bool> shouldStop; ///< Flag for cancelling Run()
DriverImpl();
DriverImpl(DriverImpl const &) = delete;
DriverImpl(DriverImpl &&) = delete;
~DriverImpl();
DriverImpl &operator=(DriverImpl const &) = delete;
DriverImpl &operator=(DriverImpl &&) = delete;
void Step(Duration timeout);
template<typename Deadline>
Duration StepTodos(Deadline deadline);
void StepSockets(Duration timeout);
void Run();
void Stop();
// interface for ToDoImpl
void ToDoInsert(ToDoShared todo);
void ToDoRemove(ToDo::ToDoImpl *todo);
void ToDoMove(ToDoShared todo, TimePoint when);
// interface for SocketAsyncImpl
void AsyncRegister(SocketAsyncImpl &sock);
void AsyncUnregister(SOCKET fd);
void AsyncWantSend(SOCKET fd);
// interactions with signalling pipe
void Bump();
void Unbump();
// interactions with sockets
void QuerySockets();
void DoOneSocketTask();
};
} // namespace sockpuppet
#endif // SOCKPUPPET_DRIVER_IMPL_H