Skip to content

Commit

Permalink
better Throttle strategy to guarantee the CPS (calls per second) limit
Browse files Browse the repository at this point in the history
  • Loading branch information
janwilmans committed Dec 12, 2016
1 parent 2d040f7 commit fd3a38a
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 28 deletions.
28 changes: 18 additions & 10 deletions CobaltFusion/Throttle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,33 @@ using namespace std::chrono_literals;

Throttle::Throttle(IExecutor& executor, int callsPerSecond) :
m_executor(executor),
m_delta(std::chrono::milliseconds(1000/callsPerSecond))
m_delta(std::chrono::milliseconds(1000/callsPerSecond)),
m_callPending(false)
{
}

void Throttle::Call(std::function<void()> fn)
{
if (m_scheduledCall.is_initialized())
m_lastCallTimePoint = Clock::now();
if (!m_callPending)
{
if ((Clock::now() - m_lastScheduleCallTimePoint) > m_delta /2 )
{
m_scheduledCall.get().Cancel();
m_scheduledCall.reset();
m_executor.CallAsync([=] { fn(); });
}
m_lastScheduledCallTimePoint = m_lastCallTimePoint;
m_callPending = true;
m_executor.CallAt(Clock::now() + m_delta, [this, fn] { PendingCall(fn); });
}
}

void Throttle::PendingCall(std::function<void()> fn)
{
fn();
if (m_lastCallTimePoint > m_lastScheduledCallTimePoint)
{
m_lastScheduledCallTimePoint = Clock::now();
m_executor.CallAt(Clock::now() + m_delta, [this, fn] { PendingCall(fn); });
}
else
{
m_lastScheduleCallTimePoint = Clock::now() + m_delta;
m_scheduledCall = m_executor.CallAt(m_lastScheduleCallTimePoint, [=] { fn(); });
m_callPending = false;
}
}

Expand Down
12 changes: 7 additions & 5 deletions CobaltFusionTest/CobaltFusionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,9 @@ std::ostream & operator<<(std::ostream &os, const std::chrono::steady_clock::dur

BOOST_AUTO_TEST_CASE(ThrottleTest)
{
auto exec = std::make_unique<ActiveExecutorClient>();
ActiveExecutorClient exec;
const int testCPS = 20;
Throttle throttle(*exec, testCPS); // max calls per second
Throttle throttle(exec, testCPS); // max calls per second
using namespace std::chrono_literals;
using namespace std::chrono;

Expand All @@ -318,7 +318,9 @@ BOOST_AUTO_TEST_CASE(ThrottleTest)
}
std::this_thread::sleep_for(1ms*randomdelay(gen));
}
exec.reset();

// workaround to wait for any pending calls, since we dont have ExecutorClient::Flush()
std::this_thread::sleep_for(200ms);

auto testtime = ActiveExecutorClient::Clock::now() - start;
auto testtestMs = duration_cast<milliseconds>(testtime).count();
Expand All @@ -327,8 +329,8 @@ BOOST_AUTO_TEST_CASE(ThrottleTest)
auto lastDelta = lastexecutionTime - lastcallTime;
std::cout << "Last execution was " << lastDelta << " after last call.\n";
std::cout << "Called " << counter << " times over " << testtime << ", " << callsPerSecond << "cps\n";
BOOST_CHECK_LT(callsPerSecond, testCPS + 5);
//BOOST_CHECK_GT(lastDelta.count(), 0);
BOOST_CHECK_LT(callsPerSecond, testCPS);
BOOST_CHECK_GT(lastDelta.count(), 0);
}


Expand Down
8 changes: 4 additions & 4 deletions DebugView++Lib/LogSources.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ LogSources::LogSources(IExecutor& executor, bool startListening) :
m_loopback(CreateLoopback(m_timer, m_linebuffer)),
m_updatePending(false),
m_executor(executor),
m_throttle(m_executor, 20),
m_throttle(m_executor, 25),
m_listenThread(startListening ? std::make_unique<fusion::thread>([this] { Listen(); }) : nullptr)
{
m_processMonitor.ConnectProcessEnded([this](DWORD pid, HANDLE handle) { OnProcessEnded(pid, handle); });
Expand Down Expand Up @@ -154,9 +154,9 @@ const std::chrono::milliseconds graceTime(40); // -> intentionally near what the

void LogSources::OnUpdate()
{
//m_throttle.Call([&] { m_update(); });
m_updatePending = true;
m_executor.CallAfter(graceTime, [this]() { DelayedUpdate(); });
m_throttle.Call([&] { m_update(); });
//m_updatePending = true;
//m_executor.CallAfter(graceTime, [this]() { DelayedUpdate(); });
}

void LogSources::DelayedUpdate()
Expand Down
13 changes: 4 additions & 9 deletions include/CobaltFusion/Throttle.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,10 @@
#include <chrono>
#include <functional>
#include "boost/optional.hpp"
#include "boost/signals2.hpp"
#include "CobaltFusion/ExecutorClient.h"

namespace fusion {

// todo: define a frequency type to specifiy 'calls per second' ?
//constexpr unsigned long long operator "" _Hz(unsigned long long l)
//{
// return static_cast<unsigned long long>(l);
//}

class Throttle
{
public:
Expand All @@ -29,11 +22,13 @@ class Throttle

Throttle(IExecutor& executor, int callsPerSecond);
void Call(std::function<void()> fn);
void PendingCall(std::function<void()> fn);

private:
boost::optional<ScheduledCall> m_scheduledCall;
Clock::duration m_delta;
Clock::time_point m_lastScheduleCallTimePoint;
Clock::time_point m_lastCallTimePoint;
Clock::time_point m_lastScheduledCallTimePoint;
bool m_callPending;
IExecutor& m_executor;
};

Expand Down

0 comments on commit fd3a38a

Please sign in to comment.