Skip to content

Commit

Permalink
Simplify and fix TCP netprobe probe (#690)
Browse files Browse the repository at this point in the history
* add connect failure metric
* simplify tcp probe. use target id for transaction id.
* remove header, add some comments
* use port based transaction id (via string) for icmp
  • Loading branch information
weyrick authored Oct 7, 2023
1 parent c3a3a08 commit 09d427c
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 67 deletions.
25 changes: 17 additions & 8 deletions src/handlers/netprobe/NetProbeStreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void NetProbeStreamHandler::probe_signal_send(pcpp::Packet &payload, TestType ty
}
} else if (type == TestType::TCP) {
if (auto tcp = payload.getLayerOfType<pcpp::TcpLayer>(); tcp != nullptr) {
_metrics->process_netprobe_tcp(static_cast<uint32_t>(tcp->getSrcPort()), true, name, stamp);
_metrics->process_netprobe_tcp(true, name, stamp);
}
}
}
Expand All @@ -92,7 +92,7 @@ void NetProbeStreamHandler::probe_signal_recv(pcpp::Packet &payload, TestType ty
}
} else if (type == TestType::TCP) {
if (auto tcp = payload.getLayerOfType<pcpp::TcpLayer>(); tcp != nullptr) {
_metrics->process_netprobe_tcp(static_cast<uint32_t>(tcp->getDstPort()), false, name, stamp);
_metrics->process_netprobe_tcp(false, name, stamp);
}
}
}
Expand All @@ -119,6 +119,7 @@ void NetProbeMetricsBucket::specialized_merge(const AbstractMetricsBucket &o, Me
if (group_enabled(group::NetProbeMetrics::Counters)) {
_targets_metrics[targetId]->attempts += target.second->attempts;
_targets_metrics[targetId]->successes += target.second->successes;
_targets_metrics[targetId]->connect_failures += target.second->connect_failures;
_targets_metrics[targetId]->dns_failures += target.second->dns_failures;
_targets_metrics[targetId]->timed_out += target.second->timed_out;
}
Expand All @@ -143,6 +144,7 @@ void NetProbeMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelM
if (group_enabled(group::NetProbeMetrics::Counters)) {
target.second->attempts.to_prometheus(out, target_labels);
target.second->successes.to_prometheus(out, target_labels);
target.second->connect_failures.to_prometheus(out, target_labels);
target.second->dns_failures.to_prometheus(out, target_labels);
target.second->timed_out.to_prometheus(out, target_labels);
}
Expand Down Expand Up @@ -198,6 +200,7 @@ void NetProbeMetricsBucket::to_opentelemetry(metrics::v1::ScopeMetrics &scope, t
if (group_enabled(group::NetProbeMetrics::Counters)) {
target.second->attempts.to_opentelemetry(scope, start_ts, end_ts, target_labels);
target.second->successes.to_opentelemetry(scope, start_ts, end_ts, target_labels);
target.second->connect_failures.to_opentelemetry(scope, start_ts, end_ts, target_labels);
target.second->dns_failures.to_opentelemetry(scope, start_ts, end_ts, target_labels);
target.second->timed_out.to_opentelemetry(scope, start_ts, end_ts, target_labels);
}
Expand Down Expand Up @@ -252,6 +255,7 @@ void NetProbeMetricsBucket::to_json(json &j) const
if (group_enabled(group::NetProbeMetrics::Counters)) {
target.second->attempts.to_json(j["targets"][targetId]);
target.second->successes.to_json(j["targets"][targetId]);
target.second->connect_failures.to_json(j["targets"][targetId]);
target.second->dns_failures.to_json(j["targets"][targetId]);
target.second->timed_out.to_json(j["targets"][targetId]);
}
Expand Down Expand Up @@ -311,9 +315,12 @@ void NetProbeMetricsBucket::process_failure(ErrorType error, const std::string &
break;
case ErrorType::Timeout:
++_targets_metrics[target]->timed_out;
break;
case ErrorType::SocketError:
case ErrorType::InvalidIp:
case ErrorType::ConnectionFailure:
case ErrorType::ConnectFailure:
++_targets_metrics[target]->connect_failures;
break;
default:
break;
}
Expand Down Expand Up @@ -374,12 +381,14 @@ void NetProbeMetricsManager::process_netprobe_icmp(pcpp::IcmpLayer *layer, const

if (layer->getMessageType() == pcpp::ICMP_ECHO_REQUEST) {
if (auto request = layer->getEchoRequestData(); request != nullptr) {
_request_reply_manager->start_transaction((static_cast<uint32_t>(request->header->id) << 16) | request->header->sequence, {{stamp, {0, 0}}, target});
auto ping_id = (static_cast<uint32_t>(request->header->id) << 16) | request->header->sequence;
_request_reply_manager->start_transaction(std::to_string(ping_id), {{stamp, {0, 0}}, target});
}
live_bucket()->process_attempts(_deep_sampling_now, target);
} else if (layer->getMessageType() == pcpp::ICMP_ECHO_REPLY) {
if (auto reply = layer->getEchoReplyData(); reply != nullptr) {
auto xact = _request_reply_manager->maybe_end_transaction((static_cast<uint32_t>(reply->header->id) << 16) | reply->header->sequence, stamp);
auto ping_id = (static_cast<uint32_t>(reply->header->id) << 16) | reply->header->sequence;
auto xact = _request_reply_manager->maybe_end_transaction(std::to_string(ping_id), stamp);
if (xact.first == Result::Valid) {
live_bucket()->new_transaction(_deep_sampling_now, xact.second);
} else if (xact.first == Result::TimedOut) {
Expand All @@ -389,16 +398,16 @@ void NetProbeMetricsManager::process_netprobe_icmp(pcpp::IcmpLayer *layer, const
}
}

void NetProbeMetricsManager::process_netprobe_tcp(uint32_t port, bool send, const std::string &target, timespec stamp)
void NetProbeMetricsManager::process_netprobe_tcp(bool send, const std::string &target, timespec stamp)
{
// base event
new_event(stamp);

if (send) {
_request_reply_manager->start_transaction(port, {{stamp, {0, 0}}, target});
_request_reply_manager->start_transaction(target, {{stamp, {0, 0}}, target});
live_bucket()->process_attempts(_deep_sampling_now, target);
} else {
auto xact = _request_reply_manager->maybe_end_transaction(port, stamp);
auto xact = _request_reply_manager->maybe_end_transaction(target, stamp);
if (xact.first == Result::Valid) {
live_bucket()->new_transaction(_deep_sampling_now, xact.second);
} else if (xact.first == Result::TimedOut) {
Expand Down
8 changes: 5 additions & 3 deletions src/handlers/netprobe/NetProbeStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct Target {
Counter successes;
Counter minimum;
Counter maximum;
Counter connect_failures;
Counter dns_failures;
Counter timed_out;

Expand All @@ -59,7 +60,8 @@ struct Target {
, successes(NET_PROBE_SCHEMA, {"successes"}, "Total Net Probe successes")
, minimum(NET_PROBE_SCHEMA, {"response_min_us"}, "Minimum response time measured in the reporting interval")
, maximum(NET_PROBE_SCHEMA, {"response_max_us"}, "Maximum response time measured in the reporting interval")
, dns_failures(NET_PROBE_SCHEMA, {"dns_lookup_failures"}, "Total Net Probe failures when performed DNS lookup")
, connect_failures(NET_PROBE_SCHEMA, {"connect_failures"}, "Total Net Probe failures when performing a TCP socket connection")
, dns_failures(NET_PROBE_SCHEMA, {"dns_lookup_failures"}, "Total Net Probe failures when performing a DNS lookup")
, timed_out(NET_PROBE_SCHEMA, {"packets_timeout"}, "Total Net Probe timeout transactions")
{
}
Expand Down Expand Up @@ -98,7 +100,7 @@ class NetProbeMetricsBucket final : public visor::AbstractMetricsBucket

class NetProbeMetricsManager final : public visor::AbstractMetricsManager<NetProbeMetricsBucket>
{
typedef TransactionManager<uint32_t, NetProbeTransaction, std::hash<uint32_t>> NetProbeTransactionManager;
typedef TransactionManager<std::string, NetProbeTransaction, std::hash<std::string>> NetProbeTransactionManager;
std::unique_ptr<NetProbeTransactionManager> _request_reply_manager;

public:
Expand All @@ -122,7 +124,7 @@ class NetProbeMetricsManager final : public visor::AbstractMetricsManager<NetPro
void process_filtered(timespec stamp);
void process_failure(ErrorType error, const std::string &target);
void process_netprobe_icmp(pcpp::IcmpLayer *layer, const std::string &target, timespec stamp);
void process_netprobe_tcp(uint32_t port, bool send, const std::string &target, timespec stamp);
void process_netprobe_tcp(bool send, const std::string &target, timespec stamp);
};

class NetProbeStreamHandler final : public visor::StreamMetricsHandler<NetProbeMetricsManager>
Expand Down
2 changes: 1 addition & 1 deletion src/inputs/netprobe/NetProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ enum class ErrorType {
SocketError,
DnsLookupFailure,
InvalidIp,
ConnectionFailure
ConnectFailure
};

enum class TestType {
Expand Down
3 changes: 3 additions & 0 deletions src/inputs/netprobe/PingProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ bool PingProbe::start(std::shared_ptr<uvw::Loop> io_loop)
throw NetProbeException("PingProbe - unable to initialize AsyncHandle receiver");
}
_recv_handler->on<uvw::AsyncEvent>([this](const auto &, auto &) {
// note this processes received packets across ALL active ping probes (because of the single receiver thread)
// the expectation is that packets which did not originate from this probe will be ignored by the handler attached to this probe,
// since it did not originate from it
for (auto &[packet, stamp] : PingReceiver::recv_packets) {
_recv(packet, TestType::Ping, _name, stamp);
}
Expand Down
54 changes: 13 additions & 41 deletions src/inputs/netprobe/TcpProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,44 +31,17 @@ bool TcpProbe::start(std::shared_ptr<uvw::Loop> io_loop)
}

_interval_timer->on<uvw::TimerEvent>([this](const auto &, auto &) {
_internal_sequence = 0;
_timeout_timer = _io_loop->resource<uvw::TimerHandle>();
if (!_timeout_timer) {
throw NetProbeException("Netprobe - unable to initialize timeout TimerHandle");
}

_timeout_timer->on<uvw::TimerEvent>([this](const auto &, auto &) {
_internal_sequence = _config.packets_per_test;
_fail(ErrorType::Timeout, TestType::Ping, _name);
if (_internal_timer) {
_internal_timer->stop();
}
_interval_timer->again();
});

if (!_dns.empty()) {
auto [ip, ipv4] = _resolve_dns();
_ip_str = ip;
_is_ipv4 = ipv4;
if (_ip_str.empty()) {
_fail(ErrorType::DnsLookupFailure, TestType::Ping, _name);
_fail(ErrorType::DnsLookupFailure, TestType::TCP, _name);
return;
}
}

_internal_timer = _io_loop->resource<uvw::TimerHandle>();
_internal_timer->on<uvw::TimerEvent>([this](const auto &, auto &) {
if (_internal_sequence < _config.packets_per_test) {
_internal_sequence++;
_timeout_timer->stop();
_timeout_timer->start(uvw::TimerHandle::Time{_config.timeout_msec}, uvw::TimerHandle::Time{0});
_perform_tcp_process();
}
});
_timeout_timer->start(uvw::TimerHandle::Time{_config.timeout_msec}, uvw::TimerHandle::Time{0});
_perform_tcp_process();
_internal_sequence++;
_internal_timer->start(uvw::TimerHandle::Time{_config.packets_interval_msec}, uvw::TimerHandle::Time{_config.packets_interval_msec});
});

_interval_timer->start(uvw::TimerHandle::Time{0}, uvw::TimerHandle::Time{_config.interval_msec});
Expand All @@ -80,33 +53,32 @@ void TcpProbe::_perform_tcp_process()
{
_client = _io_loop->resource<uvw::TCPHandle>();
_client->on<uvw::ErrorEvent>([this](const auto &, auto &) {
_fail(ErrorType::ConnectionFailure, TestType::Ping, _name);
_fail(ErrorType::ConnectFailure, TestType::TCP, _name);
});
_client->once<uvw::CloseEvent>([this](const uvw::CloseEvent &, uvw::TCPHandle &) {
timespec stamp;
std::timespec_get(&stamp, TIME_UTC);
pcpp::Packet packet;
auto layer = pcpp::TcpLayer(static_cast<uint16_t>(_port), _client_port);
packet.addLayer(&layer);
_recv(packet, TestType::TCP, _name, stamp);
});
_client->once<uvw::ShutdownEvent>([](const uvw::ShutdownEvent &, uvw::TCPHandle &handle) {
_client->once<uvw::ShutdownEvent>([this](const uvw::ShutdownEvent &, uvw::TCPHandle &handle) {
handle.close();
});
_client->once<uvw::ConnectEvent>([this](const uvw::ConnectEvent &, uvw::TCPHandle &handle) {
timespec stamp;
std::timespec_get(&stamp, TIME_UTC);
_client_port = static_cast<uint16_t>(handle.sock().port);
pcpp::Packet packet;
auto layer = pcpp::TcpLayer(_client_port, static_cast<uint16_t>(_port));
auto layer = pcpp::TcpLayer(0, static_cast<uint16_t>(_dst_port));
packet.addLayer(&layer);
_send(packet, TestType::TCP, _name, stamp);
_recv(packet, TestType::TCP, _name, stamp);
handle.shutdown();
});
timespec stamp;
std::timespec_get(&stamp, TIME_UTC);
pcpp::Packet packet;
auto layer = pcpp::TcpLayer(0, static_cast<uint16_t>(_dst_port));
packet.addLayer(&layer);
_send(packet, TestType::TCP, _name, stamp);
if (_is_ipv4) {
_client->connect<uvw::TCPHandle::IPv4>(_ip_str, _port);
_client->connect<uvw::TCPHandle::IPv4>(_ip_str, _dst_port);
} else {
_client->connect<uvw::TCPHandle::IPv6>(_ip_str, _port);
_client->connect<uvw::TCPHandle::IPv6>(_ip_str, _dst_port);
}
}

Expand Down
16 changes: 2 additions & 14 deletions src/inputs/netprobe/TcpProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,13 @@

namespace visor::input::netprobe {

/**
* @class PingProbe
* @brief PingProbe class used for sending ICMP Echo Requests.
*
* This class is created for each specified target. However, it reuses a shared socket per thread (per UV_LOOP).
* I.e. each unique NetProbeInputStream with Ping Type will have a socket to send ICMP Echo Request.
*/
class TcpProbe final : public NetProbe
{
uint32_t _port;
uint16_t _client_port;
uint32_t _dst_port;
bool _init{false};
bool _is_ipv4{false};
uint16_t _internal_sequence{0};
std::string _ip_str;
std::shared_ptr<uvw::TimerHandle> _interval_timer;
std::shared_ptr<uvw::TimerHandle> _internal_timer;
std::shared_ptr<uvw::TimerHandle> _timeout_timer;

std::shared_ptr<uvw::TCPHandle> _client;

Expand All @@ -37,8 +26,7 @@ class TcpProbe final : public NetProbe
public:
TcpProbe(uint16_t id, const std::string &name, const pcpp::IPAddress &ip, const std::string &dns, uint32_t port)
: NetProbe(id, name, ip, dns)
, _port(port)
, _client_port(0){};
, _dst_port(port) {};
~TcpProbe() = default;
bool start(std::shared_ptr<uvw::Loop> io_loop) override;
bool stop() override;
Expand Down

0 comments on commit 09d427c

Please sign in to comment.