Skip to content

Commit

Permalink
net: move some device and qp methods out-of-line
Browse files Browse the repository at this point in the history
  • Loading branch information
avikivity committed Jan 22, 2015
1 parent 5678a09 commit d0ec993
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 100 deletions.
114 changes: 114 additions & 0 deletions net/net.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,120 @@ using std::move;

namespace net {

inline
bool qp::poll_tx() {
if (_tx_packetq.size() < 16) {
// refill send queue from upper layers
uint32_t work;
do {
work = 0;
for (auto&& pr : _pkt_providers) {
auto p = pr();
if (p) {
work++;
_tx_packetq.push_back(std::move(p.value()));
if (_tx_packetq.size() == 128) {
break;
}
}
}
} while (work && _tx_packetq.size() < 128);

}
if (!_tx_packetq.empty()) {
_last_tx_bunch = send(_tx_packetq);
_packets_snt += _last_tx_bunch;
return true;
}

return false;
}

qp::qp()
: _tx_poller([this] { return poll_tx(); })
, _collectd_regs({
// queue_length value:GAUGE:0:U
// Absolute value of num packets in last tx bunch.
scollectd::add_polled_metric(scollectd::type_instance_id("network"
, scollectd::per_cpu_plugin_instance
, "queue_length", "tx-packet-queue")
, scollectd::make_typed(scollectd::data_type::GAUGE, _last_tx_bunch)
),
// total_operations value:DERIVE:0:U
scollectd::add_polled_metric(scollectd::type_instance_id("network"
, scollectd::per_cpu_plugin_instance
, "total_operations", "tx-packets")
, scollectd::make_typed(scollectd::data_type::DERIVE, _packets_snt)
),
// queue_length value:GAUGE:0:U
// Absolute value of num packets in last rx bunch.
scollectd::add_polled_metric(scollectd::type_instance_id("network"
, scollectd::per_cpu_plugin_instance
, "queue_length", "rx-packet-queue")
, scollectd::make_typed(scollectd::data_type::GAUGE, _last_rx_bunch)
),
// total_operations value:DERIVE:0:U
scollectd::add_polled_metric(scollectd::type_instance_id("network"
, scollectd::per_cpu_plugin_instance
, "total_operations", "rx-packets")
, scollectd::make_typed(scollectd::data_type::DERIVE, _packets_rcv)
),
}) {
}

qp::~qp() {
}

void qp::configure_proxies(const std::map<unsigned, float>& cpu_weights) {
assert(!cpu_weights.empty());
if ((cpu_weights.size() == 1 && cpu_weights.begin()->first == engine.cpu_id())) {
// special case queue sending to self only, to avoid requiring a hash value
return;
}
register_packet_provider([this] {
std::experimental::optional<packet> p;
if (!_proxy_packetq.empty()) {
p = std::move(_proxy_packetq.front());
_proxy_packetq.pop_front();
}
return p;
});
build_sw_reta(cpu_weights);
}

void qp::build_sw_reta(const std::map<unsigned, float>& cpu_weights) {
float total_weight = 0;
for (auto&& x : cpu_weights) {
total_weight += x.second;
}
float accum = 0;
unsigned idx = 0;
std::array<uint8_t, 128> reta;
for (auto&& entry : cpu_weights) {
auto cpu = entry.first;
auto weight = entry.second;
accum += weight;
while (idx < (accum / total_weight * reta.size() - 0.5)) {
reta[idx++] = cpu;
}
}
_sw_reta = reta;
}

subscription<packet>
device::receive(std::function<future<> (packet)> next_packet) {
auto sub = _queues[engine.cpu_id()]->_rx_stream.listen(std::move(next_packet));
_queues[engine.cpu_id()]->rx_start();
return std::move(sub);
}

void device::set_local_queue(std::unique_ptr<qp> dev) {
assert(!_queues[engine.cpu_id()]);
_queues[engine.cpu_id()] = dev.get();
engine.at_destroy([dev = std::move(dev)] {});
}


l3_protocol::l3_protocol(interface* netif, eth_protocol_num proto_num, packet_provider_type func)
: _netif(netif), _proto_num(proto_num) {
_netif->register_packet_provider(std::move(func));
Expand Down
107 changes: 7 additions & 100 deletions net/net.hh
Original file line number Diff line number Diff line change
Expand Up @@ -133,35 +133,8 @@ protected:
_packets_rcv += count;
}
public:
qp() : _tx_poller([this] { return poll_tx(); }), _collectd_regs({
// queue_length value:GAUGE:0:U
// Absolute value of num packets in last tx bunch.
scollectd::add_polled_metric(scollectd::type_instance_id("network"
, scollectd::per_cpu_plugin_instance
, "queue_length", "tx-packet-queue")
, scollectd::make_typed(scollectd::data_type::GAUGE, _last_tx_bunch)
),
// total_operations value:DERIVE:0:U
scollectd::add_polled_metric(scollectd::type_instance_id("network"
, scollectd::per_cpu_plugin_instance
, "total_operations", "tx-packets")
, scollectd::make_typed(scollectd::data_type::DERIVE, _packets_snt)
),
// queue_length value:GAUGE:0:U
// Absolute value of num packets in last rx bunch.
scollectd::add_polled_metric(scollectd::type_instance_id("network"
, scollectd::per_cpu_plugin_instance
, "queue_length", "rx-packet-queue")
, scollectd::make_typed(scollectd::data_type::GAUGE, _last_rx_bunch)
),
// total_operations value:DERIVE:0:U
scollectd::add_polled_metric(scollectd::type_instance_id("network"
, scollectd::per_cpu_plugin_instance
, "total_operations", "rx-packets")
, scollectd::make_typed(scollectd::data_type::DERIVE, _packets_rcv)
),
}) {}
virtual ~qp() {}
qp();
virtual ~qp();
virtual future<> send(packet p) = 0;
virtual uint32_t send(circular_buffer<packet>& p) {
uint32_t sent = 0;
Expand All @@ -173,74 +146,16 @@ public:
return sent;
}
virtual void rx_start() {};
void configure_proxies(const std::map<unsigned, float>& cpu_weights) {
assert(!cpu_weights.empty());
if ((cpu_weights.size() == 1 && cpu_weights.begin()->first == engine.cpu_id())) {
// special case queue sending to self only, to avoid requiring a hash value
return;
}
register_packet_provider([this] {
std::experimental::optional<packet> p;
if (!_proxy_packetq.empty()) {
p = std::move(_proxy_packetq.front());
_proxy_packetq.pop_front();
}
return p;
});
build_sw_reta(cpu_weights);
}
void configure_proxies(const std::map<unsigned, float>& cpu_weights);
// build REdirection TAble for cpu_weights map: target cpu -> weight
void build_sw_reta(const std::map<unsigned, float>& cpu_weights) {
float total_weight = 0;
for (auto&& x : cpu_weights) {
total_weight += x.second;
}
float accum = 0;
unsigned idx = 0;
std::array<uint8_t, 128> reta;
for (auto&& entry : cpu_weights) {
auto cpu = entry.first;
auto weight = entry.second;
accum += weight;
while (idx < (accum / total_weight * reta.size() - 0.5)) {
reta[idx++] = cpu;
}
}
_sw_reta = reta;
}
void build_sw_reta(const std::map<unsigned, float>& cpu_weights);
void proxy_send(packet p) {
_proxy_packetq.push_back(std::move(p));
}
void register_packet_provider(packet_provider_type func) {
_pkt_providers.push_back(std::move(func));
}
bool poll_tx() {
if (_tx_packetq.size() < 16) {
// refill send queue from upper layers
uint32_t work;
do {
work = 0;
for (auto&& pr : _pkt_providers) {
auto p = pr();
if (p) {
work++;
_tx_packetq.push_back(std::move(p.value()));
if (_tx_packetq.size() == 128) {
break;
}
}
}
} while (work && _tx_packetq.size() < 128);

}
if (!_tx_packetq.empty()) {
_last_tx_bunch = send(_tx_packetq);
_packets_snt += _last_tx_bunch;
return true;
}

return false;
}
bool poll_tx();
friend class device;
};

Expand All @@ -256,11 +171,7 @@ public:
qp& queue_for_cpu(unsigned cpu) { return *_queues[cpu]; }
qp& local_queue() { return queue_for_cpu(engine.cpu_id()); }
void l2receive(packet p) { _queues[engine.cpu_id()]->_rx_stream.produce(std::move(p)); }
subscription<packet> receive(std::function<future<> (packet)> next_packet) {
auto sub = _queues[engine.cpu_id()]->_rx_stream.listen(std::move(next_packet));
_queues[engine.cpu_id()]->rx_start();
return std::move(sub);
}
subscription<packet> receive(std::function<future<> (packet)> next_packet);
virtual ethernet_address hw_address() = 0;
virtual net::hw_features hw_features() = 0;
virtual uint16_t hw_queues_count() { return 1; }
Expand All @@ -269,11 +180,7 @@ public:
virtual unsigned hash2qid(uint32_t hash) {
return hash % hw_queues_count();
}
void set_local_queue(std::unique_ptr<qp> dev) {
assert(!_queues[engine.cpu_id()]);
_queues[engine.cpu_id()] = dev.get();
engine.at_destroy([dev = std::move(dev)] {});
}
void set_local_queue(std::unique_ptr<qp> dev);
template <typename Func>
unsigned forward_dst(unsigned src_cpuid, Func&& hashfn) {
auto& qp = queue_for_cpu(src_cpuid);
Expand Down

0 comments on commit d0ec993

Please sign in to comment.