forked from scylladb/seastar
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathposix-stack.hh
189 lines (171 loc) · 7.11 KB
/
posix-stack.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/
#pragma once
#include "core/reactor.hh"
#include "core/sharded.hh"
#include "stack.hh"
#include <boost/program_options.hpp>
namespace seastar {
namespace net {
using namespace seastar;
// We can't keep this in any of the socket servers as instance members, because a connection can
// outlive the socket server. To avoid having the whole socket_server tracked as a shared pointer,
// we will have a conntrack structure.
//
// Right now this class is used by the posix_server_socket_impl, but it could be used by any other.
class conntrack {
class load_balancer {
std::vector<unsigned> _cpu_load;
public:
load_balancer() : _cpu_load(size_t(smp::count), 0) {}
void closed_cpu(shard_id cpu) {
_cpu_load[cpu]--;
}
shard_id next_cpu() {
// FIXME: The naive algorithm will just round robin the connections around the shards.
// A more complex version can keep track of the amount of activity in each connection,
// and use that information.
auto min_el = std::min_element(_cpu_load.begin(), _cpu_load.end());
auto cpu = shard_id(std::distance(_cpu_load.begin(), min_el));
_cpu_load[cpu]++;
return cpu;
}
};
lw_shared_ptr<load_balancer> _lb;
void closed_cpu(shard_id cpu) {
_lb->closed_cpu(cpu);
}
public:
class handle {
shard_id _host_cpu;
shard_id _target_cpu;
foreign_ptr<lw_shared_ptr<load_balancer>> _lb;
public:
handle() : _lb(nullptr) {}
handle(shard_id cpu, lw_shared_ptr<load_balancer> lb)
: _host_cpu(engine().cpu_id())
, _target_cpu(cpu)
, _lb(make_foreign(std::move(lb))) {}
handle(const handle&) = delete;
handle(handle&&) = default;
~handle() {
if (!_lb) {
return;
}
smp::submit_to(_host_cpu, [cpu = _target_cpu, lb = std::move(_lb)] {
lb->closed_cpu(cpu);
});
}
shard_id cpu() {
return _target_cpu;
}
};
friend class handle;
conntrack() : _lb(make_lw_shared<load_balancer>()) {}
handle get_handle() {
return handle(_lb->next_cpu(), _lb);
}
};
class posix_data_source_impl final : public data_source_impl {
lw_shared_ptr<pollable_fd> _fd;
temporary_buffer<char> _buf;
size_t _buf_size;
public:
explicit posix_data_source_impl(lw_shared_ptr<pollable_fd> fd, size_t buf_size = 8192)
: _fd(std::move(fd)), _buf(buf_size), _buf_size(buf_size) {}
future<temporary_buffer<char>> get() override;
future<> close() override;
};
class posix_data_sink_impl : public data_sink_impl {
lw_shared_ptr<pollable_fd> _fd;
packet _p;
public:
explicit posix_data_sink_impl(lw_shared_ptr<pollable_fd> fd) : _fd(std::move(fd)) {}
future<> put(packet p) override;
future<> put(temporary_buffer<char> buf) override;
future<> close() override;
};
template <transport Transport>
class posix_ap_server_socket_impl : public server_socket_impl {
struct connection {
pollable_fd fd;
socket_address addr;
connection(pollable_fd xfd, socket_address xaddr) : fd(std::move(xfd)), addr(xaddr) {}
};
static thread_local std::unordered_map<::sockaddr_in, promise<connected_socket, socket_address>> sockets;
static thread_local std::unordered_multimap<::sockaddr_in, connection> conn_q;
socket_address _sa;
public:
explicit posix_ap_server_socket_impl(socket_address sa) : _sa(sa) {}
virtual future<connected_socket, socket_address> accept() override;
virtual void abort_accept() override;
static void move_connected_socket(socket_address sa, pollable_fd fd, socket_address addr, conntrack::handle handle);
};
using posix_tcp_ap_server_socket_impl = posix_ap_server_socket_impl<transport::TCP>;
using posix_sctp_ap_server_socket_impl = posix_ap_server_socket_impl<transport::SCTP>;
template <transport Transport>
class posix_server_socket_impl : public server_socket_impl {
socket_address _sa;
pollable_fd _lfd;
conntrack _conntrack;
public:
explicit posix_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {}
virtual future<connected_socket, socket_address> accept();
virtual void abort_accept() override;
};
using posix_server_tcp_socket_impl = posix_server_socket_impl<transport::TCP>;
using posix_server_sctp_socket_impl = posix_server_socket_impl<transport::SCTP>;
template <transport Transport>
class posix_reuseport_server_socket_impl : public server_socket_impl {
socket_address _sa;
pollable_fd _lfd;
public:
explicit posix_reuseport_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {}
virtual future<connected_socket, socket_address> accept();
virtual void abort_accept() override;
};
using posix_reuseport_server_tcp_socket_impl = posix_reuseport_server_socket_impl<transport::TCP>;
using posix_reuseport_server_sctp_socket_impl = posix_reuseport_server_socket_impl<transport::SCTP>;
class posix_network_stack : public network_stack {
private:
const bool _reuseport;
public:
explicit posix_network_stack(boost::program_options::variables_map opts) : _reuseport(engine().posix_reuseport_available()) {}
virtual server_socket listen(socket_address sa, listen_options opts) override;
virtual ::seastar::socket socket() override;
virtual net::udp_channel make_udp_channel(ipv4_addr addr) override;
static future<std::unique_ptr<network_stack>> create(boost::program_options::variables_map opts) {
return make_ready_future<std::unique_ptr<network_stack>>(std::unique_ptr<network_stack>(new posix_network_stack(opts)));
}
virtual bool has_per_core_namespace() override { return _reuseport; };
};
class posix_ap_network_stack : public posix_network_stack {
private:
const bool _reuseport;
public:
posix_ap_network_stack(boost::program_options::variables_map opts) : posix_network_stack(std::move(opts)), _reuseport(engine().posix_reuseport_available()) {}
virtual server_socket listen(socket_address sa, listen_options opts) override;
static future<std::unique_ptr<network_stack>> create(boost::program_options::variables_map opts) {
return make_ready_future<std::unique_ptr<network_stack>>(std::unique_ptr<network_stack>(new posix_ap_network_stack(opts)));
}
};
}
}