Skip to content

Commit

Permalink
Optionally limit outstanding data
Browse files Browse the repository at this point in the history
  • Loading branch information
manuelbl committed Apr 11, 2022
1 parent 5275f65 commit c1290dd
Showing 1 changed file with 37 additions and 3 deletions.
40 changes: 37 additions & 3 deletions test/loopback-linux/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
#include "prng.hpp"
#include "serial.hpp"
#include <algorithm>
#include <condition_variable>
#include <iomanip>
#include <mutex>
#include <thread>

using namespace std::chrono;
Expand All @@ -37,11 +39,15 @@ static int bit_rate;
static int data_bits;
static bool with_parity;
static int rx_delay;
static int max_outstanding_bytes;

static serial_port send_port;
static serial_port recv_port;
static volatile bool test_cancelled = false;

static int outstanding_bytes;
std::mutex outstanding_data_mutex; // protects outstanding_bytes
std::condition_variable outstanding_data_condition; // to be used with outstanding_data_mutex

/**
* Checks the program arguments
Expand Down Expand Up @@ -153,6 +159,7 @@ int check_usage(int argc, char* argv[]) {
("p,parity", "Enable parity bit")
("d,databits", "Data bits (7 or 8)", cxxopts::value<int>()->default_value("8"))
("s,rx-sleep", "Sleep before reception (in s)", cxxopts::value<int>()->default_value("0"))
("o,outstanding", "Maximum data outstanding in transit (in bytes)", cxxopts::value<int>()->default_value("999999999"))
("h,help", "Show usage");
options.positional_help("tx-port [ rx-port ]").show_positional_help();

Expand All @@ -175,6 +182,7 @@ int check_usage(int argc, char* argv[]) {
data_bits = result["databits"].as<int>();
send_port_path = result["tx-port"].as<std::string>();
rx_delay = result["rx-sleep"].as<int>();
max_outstanding_bytes = result["outstanding"].as<int>();
with_parity = result.count("parity") > 0;
if (with_parity)
data_bits = std::min(std::max(data_bits, 7), 8);
Expand All @@ -198,7 +206,7 @@ int check_usage(int argc, char* argv[]) {

void send() {
prng prandom(PRNG_INIT);
uint8_t buf[128];
uint8_t buf[64];

try {

Expand All @@ -208,8 +216,26 @@ void send() {
prandom.fill(buf, m);
if (data_bits == 7)
clear_high_bit(buf, m);

// wait until outstanding data is low enough to send next chunk
{
std::unique_lock<std::mutex> lock(outstanding_data_mutex);
outstanding_data_condition.wait(lock, []{
return outstanding_bytes + sizeof(buf) <= max_outstanding_bytes
|| test_cancelled;
});
if (test_cancelled)
return;
}

send_port.transmit(buf, m);
n -= m;

// update outstanding data
{
std::unique_lock<std::mutex> lock(outstanding_data_mutex);
outstanding_bytes += m;
}
}
}
catch (serial_error& error) {
Expand All @@ -220,8 +246,8 @@ void send() {


void recv() {
uint8_t buf[128];
uint8_t expected[128];
uint8_t buf[64];
uint8_t expected[64];
prng prandom(PRNG_INIT);

try {
Expand All @@ -234,6 +260,13 @@ void recv() {
test_cancelled = true;
return;
}

// update outstanding data and notify sender
{
std::unique_lock<std::mutex> lock(outstanding_data_mutex);
outstanding_bytes -= k;
}
outstanding_data_condition.notify_one();

prandom.fill(expected, k);
if (data_bits == 7)
Expand All @@ -252,6 +285,7 @@ void recv() {
catch (serial_error& error) {
std::cerr << error.what() << std::endl;
test_cancelled = true;
outstanding_data_condition.notify_one();
}
}

Expand Down

0 comments on commit c1290dd

Please sign in to comment.