forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPacketReceiver.h
161 lines (131 loc) · 4.69 KB
/
PacketReceiver.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#pragma once
#if defined(OS_LINUX)
#include <variant>
#include <Client/IConnections.h>
#include <Common/FiberStack.h>
#include <Common/Fiber.h>
#include <Common/Epoll.h>
#include <Common/TimerDescriptor.h>
namespace DB
{
/// Class for nonblocking packet receiving. It runs connection->receivePacket
/// in fiber and sets special read callback which is called when
/// reading from socket blocks. When read callback is called,
/// socket and receive timeout are added in epoll and execution returns to the main program.
/// So, you can poll this epoll file descriptor to determine when to resume
/// packet receiving.
class PacketReceiver
{
public:
explicit PacketReceiver(Connection * connection_) : connection(connection_)
{
epoll.add(receive_timeout.getDescriptor());
epoll.add(connection->getSocket()->impl()->sockfd());
fiber = boost::context::fiber(std::allocator_arg_t(), fiber_stack, Routine{*this});
}
/// Resume packet receiving.
std::variant<int, Packet, Poco::Timespan, std::exception_ptr> resume()
{
/// If there is no pending data, check receive timeout.
if (!connection->hasReadPendingData() && !checkReceiveTimeout())
{
/// Receive timeout expired.
return Poco::Timespan();
}
/// Resume fiber.
fiber = std::move(fiber).resume();
if (exception)
return std::move(exception);
if (is_read_in_process)
return epoll.getFileDescriptor();
/// Receiving packet was finished.
return std::move(packet);
}
void cancel()
{
Fiber to_destroy = std::move(fiber);
connection = nullptr;
}
int getFileDescriptor() const { return epoll.getFileDescriptor(); }
private:
/// When epoll file descriptor is ready, check if it's an expired timeout.
/// Return false if receive timeout expired and socket is not ready, return true otherwise.
bool checkReceiveTimeout()
{
bool is_socket_ready = false;
bool is_receive_timeout_expired = false;
epoll_event events[2];
events[0].data.fd = events[1].data.fd = -1;
size_t ready_count = epoll.getManyReady(2, events, true);
for (size_t i = 0; i != ready_count; ++i)
{
if (events[i].data.fd == connection->getSocket()->impl()->sockfd())
is_socket_ready = true;
if (events[i].data.fd == receive_timeout.getDescriptor())
is_receive_timeout_expired = true;
}
if (is_receive_timeout_expired && !is_socket_ready)
{
receive_timeout.reset();
return false;
}
return true;
}
struct Routine
{
PacketReceiver & receiver;
struct ReadCallback
{
PacketReceiver & receiver;
Fiber & sink;
void operator()(int, Poco::Timespan timeout, const std::string &)
{
receiver.receive_timeout.setRelative(timeout);
receiver.is_read_in_process = true;
sink = std::move(sink).resume();
receiver.is_read_in_process = false;
receiver.receive_timeout.reset();
}
};
Fiber operator()(Fiber && sink)
{
try
{
while (true)
{
{
AsyncCallbackSetter async_setter(receiver.connection, ReadCallback{receiver, sink});
receiver.packet = receiver.connection->receivePacket();
}
sink = std::move(sink).resume();
}
}
catch (const boost::context::detail::forced_unwind &)
{
/// This exception is thrown by fiber implementation in case if fiber is being deleted but hasn't exited
/// It should not be caught or it will segfault.
/// Other exceptions must be caught
throw;
}
catch (...)
{
receiver.exception = std::current_exception();
}
return std::move(sink);
}
};
Connection * connection;
Packet packet;
Fiber fiber;
FiberStack fiber_stack;
/// We use timer descriptor for checking socket receive timeout.
TimerDescriptor receive_timeout;
/// In read callback we add socket file descriptor and timer descriptor with receive timeout
/// in epoll, so we can return epoll file descriptor outside for polling.
Epoll epoll;
/// If and exception occurred in fiber resume, we save it and rethrow.
std::exception_ptr exception;
bool is_read_in_process = false;
};
}
#endif