Skip to content

Commit

Permalink
feat: Rework the buffer cleanup to happen post-handling of packets
Browse files Browse the repository at this point in the history
  • Loading branch information
whisperity committed May 18, 2022
1 parent 210dcc9 commit 76327d4
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 109 deletions.
72 changes: 19 additions & 53 deletions include/core/monomux/adt/RingBuffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,44 +44,11 @@ class RingBufferBase

const std::size_t OriginalCapacity;

struct SuccessfulBufferlessAccess
{
std::chrono::time_point<std::chrono::system_clock> Created;
std::chrono::time_point<std::chrono::system_clock> Last;

SuccessfulBufferlessAccess() : Created(std::chrono::system_clock::now())
{
stamp();
}

void stamp() noexcept { Last = std::chrono::system_clock::now(); }
};

public:
std::size_t capacity() const noexcept { return Capacity; }
std::size_t size() const noexcept { return Size; }
bool empty() const noexcept { return Size == 0; }

/// Clients to a \p RingBuffer might be able to do the associated operation
/// without making use of the buffer. The shrinking heuristic only calculates
/// if the buffer is accessed, which means that direct, buffer-not-using
/// operations will not trigger a potential \p shrink(), which keeps a grown
/// buffer's allocation alive even if there is no need for that much space.
///
/// This method is used to signal the heuristic that the client was able to
/// do something without involving the buffer. Calling this method has no
/// effect on the actual contents of the buffer, if there is any.
void signalStatisticForSuccessfulUnbufferedAccess() noexcept
{
if (Size != 0)
return;
if (!SuccessfulBufferlessAccessCounter)
SuccessfulBufferlessAccessCounter.emplace();
else
SuccessfulBufferlessAccessCounter->stamp();
mayBeValley();
}

protected:
/// The physical size of the allocated buffer.
std::size_t Capacity = 0;
Expand All @@ -91,6 +58,7 @@ class RingBufferBase
RingBufferBase(std::size_t Capacity)
: OriginalCapacity(Capacity), Capacity(Capacity)
{
markAccess();
SizePeaks.resize(((Capacity / Kilo) + 2) * 1);
}

Expand All @@ -99,23 +67,27 @@ class RingBufferBase
void incSize() noexcept
{
++Size;
markAccess();
mayBePeak();
}
void decSize() noexcept
{
assert(Size != 0);
--Size;
markAccess();
mayBeValley();
}
void addSize(std::size_t N) noexcept
{
Size += N;
markAccess();
mayBePeak();
}
void subSize(std::size_t N) noexcept
{
assert(N <= Size);
Size -= N;
markAccess();
mayBeValley();
}
void zeroSize() noexcept
Expand All @@ -132,17 +104,17 @@ class RingBufferBase
if (Capacity <= OriginalCapacity)
return false;

// For each KiB of buffer space, we need at least this many successful
// small accesses before the shrinking of the buffer is considered.
static constexpr std::size_t TimeThreshold = 30;
if (SuccessfulBufferlessAccessCounter &&
SuccessfulBufferlessAccessCounter->Last -
SuccessfulBufferlessAccessCounter->Created >=
std::chrono::seconds(TimeThreshold))
static constexpr std::size_t TimeThresholdSeconds = 60;
if (std::chrono::system_clock::now() - LastAccess >=
std::chrono::seconds(TimeThresholdSeconds))
// Consider the buffer for shrinking if operations were successful without
// it "too many" times.
// accessing the buffer for a sufficient amount of time.
return true;

// Otherwise, if the buffer is continously used for a sufficient amount of
// time, consider shrinking it in case at least half of the recent peaks
// (inbetween each valley, i.e. zeroing the size) would've fit into the
// original buffer capacity too.
std::size_t ZeroPeaks = 0;
std::size_t SufficientlySmallPeaks = 0;
for (std::size_t Peak : SizePeaks)
Expand All @@ -160,7 +132,6 @@ class RingBufferBase
for (std::size_t& SPV : SizePeaks)
SPV = 0;
CurrentPeakIndex = 0;
SuccessfulBufferlessAccessCounter.reset();
mayBePeak();
}

Expand All @@ -170,15 +141,16 @@ class RingBufferBase
std::vector<std::size_t> SizePeaks;
std::size_t CurrentPeakIndex = 0;

std::optional<SuccessfulBufferlessAccess> SuccessfulBufferlessAccessCounter;
std::chrono::time_point<std::chrono::system_clock> LastAccess;

void markAccess() noexcept { LastAccess = std::chrono::system_clock::now(); }

/// Marks the current size as the peak of the current zone, if sufficient.
void mayBePeak() noexcept
{
if (Size == 0 || Capacity <= OriginalCapacity)
return;

SuccessfulBufferlessAccessCounter.reset();
if (Size > SizePeaks[CurrentPeakIndex])
SizePeaks[CurrentPeakIndex] = Size;
}
Expand All @@ -194,15 +166,9 @@ class RingBufferBase

++CurrentPeakIndex;
if (CurrentPeakIndex >= SizePeaks.size())
{
// If reached the end of the measurement, drop half of the measurement
// data.
auto NewMiddle = std::rotate(SizePeaks.begin(),
SizePeaks.begin() + SizePeaks.size() / 2 + 1,
SizePeaks.end());
std::fill(NewMiddle, SizePeaks.end(), 0);
CurrentPeakIndex = NewMiddle - SizePeaks.begin();
}
// If we reached the end of the measurement vector, restart from the
// start. This is a little ring buffer of itself. :)
CurrentPeakIndex = 0;
}
};

Expand Down
28 changes: 16 additions & 12 deletions include/core/monomux/server/SessionData.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,22 @@ class SessionData
/// the connection towards the session.
raw_fd getIdentifyingFD() const noexcept;

/// Reads at most \p Size bytes from the output of the program running in the
/// session, if any.
std::string readOutput(std::size_t Size);

/// \returns whether there is buffered output available on the output pipe
/// from the session, i.e. further \p readOutput() calls could be made without
/// interacting with the underlying device.
bool stillHasOutput() noexcept;

/// Sends the \p Data as input to the program running in the session, if any.
/// \returns the number of bytes written.
std::size_t sendInput(std::string_view Data);
/// \returns the connection through which data can be read from the session,
/// if there is any.
Pipe* getReader() noexcept
{
if (!hasProcess() || !getProcess().hasPty())
return nullptr;
return &getProcess().getPty()->reader();
}
/// \returns the connection through which data can be sent to the session,
/// if there is any.
Pipe* getWriter() noexcept
{
if (!hasProcess() || !getProcess().hasPty())
return nullptr;
return &getProcess().getPty()->writer();
}

const std::vector<ClientData*>& getAttachedClients() const noexcept
{
Expand Down
5 changes: 5 additions & 0 deletions include/core/monomux/system/BufferedChannel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ class BufferedChannel : public Channel
/// \returns the number of bytes already written but not yet flushed.
std::size_t writeInBuffer() const noexcept;

/// Attempts to automatically free auto-growing memory resources associated
/// with the buffer(s), if it is possible and deemed meaningful. This is a
/// heuristics-based call that does not always actually free resources.
void tryFreeResources();

protected:
UniqueScalar<OpaqueBufferType*, nullptr> Read;
UniqueScalar<OpaqueBufferType*, nullptr> Write;
Expand Down
2 changes: 2 additions & 0 deletions src/client/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ void Client::loop()
if (ExternalEventProcessor)
// Process "external" events before blocking on "wait()".
ExternalEventProcessor(*this);
ControlSocket.tryFreeResources();
DataSocket->tryFreeResources();

const std::size_t NumTriggeredFDs = Poll->wait();
for (std::size_t I = 0; I < NumTriggeredFDs; ++I)
Expand Down
2 changes: 2 additions & 0 deletions src/client/Terminal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ void Terminal::clientInput(Terminal* Term, Client& Client)

Client.sendData(Input);
} while (Term->input()->hasBufferedRead());
Term->input()->tryFreeResources();
}

void Terminal::clientOutput(Terminal* Term, Client& Client)
Expand All @@ -137,6 +138,7 @@ void Terminal::clientOutput(Terminal* Term, Client& Client)

while (Term->output()->hasBufferedWrite())
Term->output()->flushWrites();
Term->output()->tryFreeResources();
}

void Terminal::clientEventReady(Terminal* Term, Client& Client)
Expand Down
37 changes: 28 additions & 9 deletions src/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,20 @@ void Server::loop()
{
if (auto* Session = std::get_if<SessionConnection>(Entity))
{
SessionData& S = **Session;
if (Event.Incoming)
{
// First check for data coming from a session. This is the most
// populous in terms of bandwidth.
dataCallback(**Session);
dataCallback(S);
S.getReader()->tryFreeResources();
}
if (Event.Outgoing)
{
try
{
(**Session).sendInput(std::string_view{});
S.getWriter()->flushWrites();
S.getWriter()->tryFreeResources();
}
catch (const buffer_overflow& BO)
{
Expand All @@ -187,22 +192,34 @@ void Server::loop()
}
if (auto* Data = std::get_if<ClientDataConnection>(Entity))
{
ClientData& C = **Data;
auto ClientID = C.id();

if (Event.Incoming)
// Second, try to see if the data is coming from a client, like
// keypresses and such. We expect to see many of these, too.
dataCallback(**Data);
dataCallback(C);
if (Event.Outgoing)
flushAndReschedule(*Poll, *(**Data).getDataSocket());
flushAndReschedule(*Poll, *C.getDataSocket());

if (Clients.find(ClientID) != Clients.end())
C.getDataSocket()->tryFreeResources();
continue;
}
if (auto* Control = std::get_if<ClientControlConnection>(Entity))
{
ClientData& C = **Control;
auto ClientID = C.id();

if (Event.Incoming)
// Lastly, check if the receive is happening on the control
// connection, where messages are small and far inbetween.
controlCallback(**Control);
controlCallback(C);
if (Event.Outgoing)
flushAndReschedule(*Poll, (**Control).getControlSocket());
flushAndReschedule(*Poll, C.getControlSocket());

if (Clients.find(ClientID) != Clients.end())
C.getControlSocket().tryFreeResources();
continue;
}
}
Expand Down Expand Up @@ -394,6 +411,8 @@ void Server::controlCallback(ClientData& Client)

if (ClientSock.hasBufferedRead())
Poll->schedule(ClientSock.raw(), /* Incoming =*/true, /* Outgoing =*/false);
else
ClientSock.tryFreeResources();

if (Data.empty())
return;
Expand Down Expand Up @@ -478,7 +497,7 @@ void Server::dataCallback(ClientData& Client)
if (SessionData* S = Client.getAttachedSession())
try
{
S->sendInput(Data);
S->getWriter()->write(Data);
}
catch (const buffer_overflow& BO)
{
Expand Down Expand Up @@ -526,7 +545,7 @@ void Server::dataCallback(SessionData& Session)
std::string Data;
try
{
Data = Session.readOutput(BufferSize);
Data = Session.getReader()->read(BufferSize);
}
catch (const buffer_overflow& BO)
{
Expand All @@ -543,7 +562,7 @@ void Server::dataCallback(SessionData& Session)
return;
}

if (Session.stillHasOutput())
if (Session.getReader()->hasBufferedRead())
Poll->schedule(Session.getIdentifyingFD(),
/* Incoming =*/true,
/* Outgoing =*/false);
Expand Down
20 changes: 0 additions & 20 deletions src/server/SessionData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,6 @@ raw_fd SessionData::getIdentifyingFD() const noexcept
return P.getPty()->raw().get();
}

std::string SessionData::readOutput(std::size_t Size)
{
if (!hasProcess() || !getProcess().hasPty())
return {};
return getProcess().getPty()->reader().read(Size);
}

bool SessionData::stillHasOutput() noexcept
{
return hasProcess() && getProcess().getPty() &&
getProcess().getPty()->reader().hasBufferedRead();
}

std::size_t SessionData::sendInput(std::string_view Data)
{
if (!hasProcess() || !getProcess().hasPty())
return 0;
return getProcess().getPty()->writer().write(Data);
}

ClientData* SessionData::getLatestClient() const
{
MONOMUX_TRACE_LOG(LOG(trace) << "Searching latest active client of \"" << Name
Expand Down
Loading

0 comments on commit 76327d4

Please sign in to comment.