Skip to content

Commit

Permalink
hb test: clearer separation between sender and receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
iboB committed Sep 23, 2022
1 parent e514c2d commit 45e9626
Showing 1 changed file with 64 additions and 67 deletions.
131 changes: 64 additions & 67 deletions test/t-WSSessionHeartbeat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,64 +21,64 @@

constexpr uint16_t Test_Port = 7654;

class BasicSession : public fishnets::WebSocketSession
struct SessionPacket
{
enum class Type : uint32_t {
Unknown,
Id,
Heartbeat,
Done,
};
Type type = Type::Unknown;
uint32_t payload = 0;
};

class BasicSender : public fishnets::WebSocketSession
{
public:
void wsReceivedBinary(itlib::span<uint8_t> binary) final override
fishnets::WebSocketSessionOptions getInitialOptions() final override
{
REQUIRE(binary.size() == sizeof(Packet));
memcpy(&m_received.emplace_back(), binary.data(), sizeof(Packet));
fishnets::WebSocketSessionOptions ret;
ret.heartbeatInterval = std::chrono::milliseconds(100);
return ret;
}

void sendNext()
{
REQUIRE(!m_outQueue.empty());
REQUIRE(!m_curOutPacket);
m_curOutPacket.emplace(std::move(m_outQueue.front()));
REQUIRE(!m_curOutSessionPacket);
m_curOutSessionPacket.emplace(std::move(m_outQueue.front()));
m_outQueue.pop();

if (m_curOutPacket->type == Packet::Type::Done)
if (m_curOutSessionPacket->type == SessionPacket::Type::Done)
{
wsClose();
return;
}
wsSend(itlib::span(reinterpret_cast<const uint8_t*>(&m_curOutPacket.value()), sizeof(Packet)));
wsSend(itlib::span(reinterpret_cast<const uint8_t*>(&m_curOutSessionPacket.value()), sizeof(SessionPacket)));
}

void wsCompletedSend() override
{
m_curOutPacket.reset();
m_curOutSessionPacket.reset();
if (m_outQueue.empty()) return; // nothing to do
sendNext();
}

struct Packet
{
enum class Type : uint32_t {
Unknown,
Id,
Heartbeat,
Done,
};
Type type = Type::Unknown;
uint32_t payload = 0;
};
std::queue<Packet> m_outQueue;
std::optional<Packet> m_curOutPacket;
std::queue<SessionPacket> m_outQueue;
std::optional<SessionPacket> m_curOutSessionPacket;

std::vector<Packet> m_received;

void send(Packet packet)
void send(SessionPacket packet)
{
m_outQueue.push(packet);
if (m_curOutPacket) return; // we alrady have stuff going on
if (m_curOutSessionPacket) return; // we alrady have stuff going on
sendNext();
}

struct SenderEntry
{
fishnets::WebSocketSessionPtr (*make)(const fishnets::WebSocketEndpointInfo&);
void (*testResult)(const std::vector<Packet>& result);
void (*testResult)(const std::vector<SessionPacket>& result);
};
static std::vector<SenderEntry> senderRegistry;

Expand All @@ -90,7 +90,7 @@ class BasicSession : public fishnets::WebSocketSession
}
};

std::vector<BasicSession::SenderEntry> BasicSession::senderRegistry;
std::vector<BasicSender::SenderEntry> BasicSender::senderRegistry;

#define DECL_SENDER() \
static const uint32_t id; \
Expand All @@ -101,18 +101,7 @@ std::vector<BasicSession::SenderEntry> BasicSession::senderRegistry;
{ \
return std::make_shared<T>(); \
} \
const uint32_t T::id = BasicSession::registerSender<T>()

class BasicSender : public BasicSession
{
public:
fishnets::WebSocketSessionOptions getInitialOptions() final override
{
fishnets::WebSocketSessionOptions ret;
ret.heartbeatInterval = std::chrono::milliseconds(100);
return ret;
}
};
const uint32_t T::id = BasicSender::registerSender<T>()

class SimpleSender final : public BasicSender
{
Expand All @@ -123,7 +112,7 @@ class SimpleSender final : public BasicSender

void wsOpened() override
{
send({Packet::Type::Id, id});
send({SessionPacket::Type::Id, id});
}

void wsHeartbeat(uint32_t ms) override
Expand All @@ -133,21 +122,21 @@ class SimpleSender final : public BasicSender
CHECK(m_beats < 7);
if (m_beats < 6)
{
send({Packet::Type::Heartbeat, m_beats});
send({SessionPacket::Type::Heartbeat, m_beats});
}
else
{
send({Packet::Type::Done, 0});
send({SessionPacket::Type::Done, 0});
}
}

static void test(const std::vector<Packet>& r)
static void test(const std::vector<SessionPacket>& r)
{
REQUIRE(r.size() == 6);
for (uint32_t i = 1; i < 6; ++i)
{
auto& p = r[i];
CHECK(p.type == Packet::Type::Heartbeat);
CHECK(p.type == SessionPacket::Type::Heartbeat);
CHECK(p.payload == i);
}
}
Expand All @@ -162,7 +151,7 @@ class ManualHBSender final : public BasicSender

void wsOpened() override
{
send({Packet::Type::Id, id});
send({SessionPacket::Type::Id, id});

fishnets::WebSocketSessionOptions opts;
opts.heartbeatInterval = std::chrono::milliseconds(90);
Expand All @@ -176,25 +165,25 @@ class ManualHBSender final : public BasicSender
CHECK(m_beats < 7);
if (m_beats < 6)
{
send({Packet::Type::Heartbeat, m_beats});
send({SessionPacket::Type::Heartbeat, m_beats});
}
else
{
send({Packet::Type::Done, 0});
send({SessionPacket::Type::Done, 0});

fishnets::WebSocketSessionOptions opts;
opts.heartbeatInterval = std::chrono::milliseconds(0);
wsSetOptions(opts);
}
}

static void test(const std::vector<Packet>& r)
static void test(const std::vector<SessionPacket>& r)
{
REQUIRE(r.size() == 6);
for (uint32_t i = 1; i < 6; ++i)
{
auto& p = r[i];
CHECK(p.type == Packet::Type::Heartbeat);
CHECK(p.type == SessionPacket::Type::Heartbeat);
CHECK(p.payload == i);
}
}
Expand All @@ -209,7 +198,7 @@ class RestartSender final : public BasicSender

void wsOpened() override
{
send({Packet::Type::Id, id});
send({SessionPacket::Type::Id, id});

fishnets::WebSocketSessionOptions opts;
opts.heartbeatInterval = std::chrono::milliseconds(50);
Expand All @@ -223,7 +212,7 @@ class RestartSender final : public BasicSender

if (ms == 50)
{
send({Packet::Type::Heartbeat, m_beats * 10});
send({SessionPacket::Type::Heartbeat, m_beats * 10});
if (m_beats == 3)
{
fishnets::WebSocketSessionOptions opts;
Expand All @@ -234,10 +223,10 @@ class RestartSender final : public BasicSender
else
{
CHECK(ms == 150);
send({Packet::Type::Heartbeat, m_beats * 100});
send({SessionPacket::Type::Heartbeat, m_beats * 100});
if (m_beats == 6)
{
send({Packet::Type::Done, 0});
send({SessionPacket::Type::Done, 0});
}
}
}
Expand All @@ -250,39 +239,47 @@ class RestartSender final : public BasicSender
opts.heartbeatInterval = std::chrono::milliseconds(150);
wsSetOptions(opts);
}
BasicSession::wsCompletedSend();
BasicSender::wsCompletedSend();
}

static void test(const std::vector<Packet>& r)
static void test(const std::vector<SessionPacket>& r)
{
REQUIRE(r.size() == 7);
for (uint32_t i = 1; i < 4; ++i)
{
auto& p = r[i];
CHECK(p.type == Packet::Type::Heartbeat);
CHECK(p.type == SessionPacket::Type::Heartbeat);
CHECK(p.payload == i * 10);
}
for (uint32_t i = 4; i < 6; ++i)
{
auto& p = r[i];
CHECK(p.type == Packet::Type::Heartbeat);
CHECK(p.type == SessionPacket::Type::Heartbeat);
CHECK(p.payload == i * 100);
}
}
};

static std::mutex g_resultMutex;
using CaseResultVec = std::vector<std::vector<BasicSession::Packet>>;
using CaseResultVec = std::vector<std::vector<SessionPacket>>;
CaseResultVec* g_caseResult;

struct ReceiverSession final : public BasicSession
struct ReceiverSession final : public fishnets::WebSocketSession
{
void wsReceivedBinary(itlib::span<uint8_t> binary) final override
{
REQUIRE(binary.size() == sizeof(SessionPacket));
memcpy(&m_received.emplace_back(), binary.data(), sizeof(SessionPacket));
}

~ReceiverSession()
{
std::lock_guard l(g_resultMutex);
REQUIRE(g_caseResult);
g_caseResult->push_back(std::move(m_received));
}

std::vector<SessionPacket> m_received;
};

class CaseResultChecker
Expand All @@ -299,7 +296,7 @@ class CaseResultChecker
REQUIRE(!!g_caseResult);
g_caseResult = nullptr;

REQUIRE(m_result.size() == BasicSession::senderRegistry.size());
REQUIRE(m_result.size() == BasicSender::senderRegistry.size());

for (auto& r : m_result)
{
Expand All @@ -313,9 +310,9 @@ class CaseResultChecker
for (size_t i=0; i<m_result.size(); ++i)
{
auto& r = m_result[i];
CHECK(r.front().type == BasicSession::Packet::Type::Id);
CHECK(r.front().type == SessionPacket::Type::Id);
CHECK(r.front().payload == uint32_t(i));
BasicSession::senderRegistry[i].testResult(r);
BasicSender::senderRegistry[i].testResult(r);
}
}
};
Expand All @@ -337,7 +334,7 @@ TEST_CASE("Client heartbeat")
fishnets::WebSocketServer server(Make_ReceiverSession, Test_Port, 2, testServerSSLSettings.get());

std::vector<std::thread> clients;
for (auto& ts : BasicSession::senderRegistry)
for (auto& ts : BasicSender::senderRegistry)
{
clients.emplace_back([&]() {
fishnets::WebSocketClient client(ts.make, testClientSSLSettings.get());
Expand All @@ -359,12 +356,12 @@ TEST_CASE("Server heartbeat")
std::atomic_size_t c = {};
fishnets::WebSocketServer server([&c](const fishnets::WebSocketEndpointInfo& info) {
auto i = c.fetch_add(1);
REQUIRE(i < BasicSession::senderRegistry.size());
return BasicSession::senderRegistry[i].make(info);
REQUIRE(i < BasicSender::senderRegistry.size());
return BasicSender::senderRegistry[i].make(info);
}, Test_Port, 2, testServerSSLSettings.get());

std::vector<std::thread> clients;
for (size_t i=0; i<BasicSession::senderRegistry.size(); ++i)
for (size_t i=0; i < BasicSender::senderRegistry.size(); ++i)
{
clients.emplace_back([&]() {
fishnets::WebSocketClient client(Make_ReceiverSession, testClientSSLSettings.get());
Expand Down

0 comments on commit 45e9626

Please sign in to comment.