Skip to content

Commit

Permalink
Adding rate limiters for zippy read/write calls
Browse files Browse the repository at this point in the history
Summary:
Adding rate limiters for zippy read/write calls.
Reusing wdt throttler and adding a wrapper around it. Wdt throttler is thread
safe and fair (threads working at similar speed gets same amount of slots).
Throttler speeds can be changed dynamically through configerator.

Differential Revision: D5582254

fbshipit-source-id: 66df2a226ac22ca40a2974bcb46f231dc537aa32
  • Loading branch information
uddipta authored and facebook-github-bot committed Aug 9, 2017
1 parent 82dd567 commit 87b03e2
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 145 deletions.
167 changes: 77 additions & 90 deletions Throttler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,63 +19,56 @@ const double kPeakMultiplier = 1.2;
const int kBucketMultiplier = 2;
const double kTimeMultiplier = 0.25;

std::shared_ptr<Throttler> Throttler::makeThrottler(const WdtOptions& options) {
double avgRateBytesPerSec = options.avg_mbytes_per_sec * kMbToB;
double peakRateBytesPerSec = options.max_mbytes_per_sec * kMbToB;
double bucketLimitBytes = options.throttler_bucket_limit * kMbToB;
int64_t singleRequestLimit =
options.buffer_size; // Throttler limit is generally called with
// buffer_size amount of data
std::shared_ptr<Throttler> Throttler::makeThrottler(
const ThrottlerOptions& options) {
double avgRatePerSec = options.avg_rate_per_sec;
double peakRatePerSec = options.max_rate_per_sec;
double bucketLimit = options.throttler_bucket_limit;
int64_t singleRequestLimit = options.single_request_limit;
Throttler* throttler =
new Throttler(avgRateBytesPerSec, peakRateBytesPerSec, bucketLimitBytes,
new Throttler(avgRatePerSec, peakRatePerSec, bucketLimit,
singleRequestLimit, options.throttler_log_time_millis);
return std::shared_ptr<Throttler>(throttler);
}

void Throttler::configureOptions(double& avgRateBytesPerSec,
double& peakRateBytesPerSec,
double& bucketLimitBytes) {
if (peakRateBytesPerSec < avgRateBytesPerSec && peakRateBytesPerSec >= 0) {
void Throttler::configureOptions(double& avgRatePerSec, double& peakRatePerSec,
double& bucketLimit) {
if (peakRatePerSec < avgRatePerSec && peakRatePerSec >= 0) {
WLOG(WARNING) << "Per thread peak rate should be greater "
<< "than per thread average rate. "
<< "Making peak rate 1.2 times the average rate";
peakRateBytesPerSec = kPeakMultiplier * (double)avgRateBytesPerSec;
peakRatePerSec = kPeakMultiplier * (double)avgRatePerSec;
}
if (bucketLimitBytes <= 0 && peakRateBytesPerSec > 0) {
bucketLimitBytes =
kTimeMultiplier * kBucketMultiplier * peakRateBytesPerSec;
if (bucketLimit <= 0 && peakRatePerSec > 0) {
bucketLimit = kTimeMultiplier * kBucketMultiplier * peakRatePerSec;
WLOG(INFO) << "Burst limit not specified but peak "
<< "rate is configured. Auto configuring to "
<< bucketLimitBytes / kMbToB << " Mbytes";
<< "rate is configured. Auto configuring to " << bucketLimit;
}
}

Throttler::Throttler(double avgRateBytesPerSec, double peakRateBytesPerSec,
double bucketLimitBytes, int64_t singleRequestLimit,
Throttler::Throttler(double avgRatePerSec, double peakRatePerSec,
double bucketLimit, int64_t singleRequestLimit,
int64_t throttlerLogTimeMillis)
: avgRateBytesPerSec_(avgRateBytesPerSec) {
bucketRateBytesPerSec_ = peakRateBytesPerSec;
bytesTokenBucketLimit_ =
kTimeMultiplier * kBucketMultiplier * bucketRateBytesPerSec_;
: avgRatePerSec_(avgRatePerSec) {
bucketRatePerSec_ = peakRatePerSec;
tokenBucketLimit_ = kTimeMultiplier * kBucketMultiplier * bucketRatePerSec_;
/* We keep the number of tokens generated as zero initially
* It could be argued that we keep this filled when we created the
* bucket. However the startTime is passed in this case and the hope is
* that we will have enough number of tokens by the time we send the data
*/
bytesTokenBucket_ = 0;
if (bucketLimitBytes > 0) {
bytesTokenBucketLimit_ = bucketLimitBytes;
tokenBucket_ = 0;
if (bucketLimit > 0) {
tokenBucketLimit_ = bucketLimit;
}
if (avgRateBytesPerSec > 0) {
WLOG(INFO) << "Average rate " << avgRateBytesPerSec_ / kMbToB
<< " Mbytes/sec";
if (avgRatePerSec > 0) {
WLOG(INFO) << "Average rate " << avgRatePerSec_;
} else {
WLOG(INFO) << "No average rate specified";
}
if (bucketRateBytesPerSec_ > 0) {
WLOG(INFO) << "Peak rate " << bucketRateBytesPerSec_ / kMbToB
<< " Mbytes/sec. Bucket limit "
<< bytesTokenBucketLimit_ / kMbToB << " Mbytes.";
if (bucketRatePerSec_ > 0) {
WLOG(INFO) << "Peak rate " << bucketRatePerSec_ << ". Bucket limit "
<< tokenBucketLimit_;
} else {
WLOG(INFO) << "No peak rate specified";
}
Expand All @@ -84,30 +77,29 @@ Throttler::Throttler(double avgRateBytesPerSec, double peakRateBytesPerSec,
throttlerLogTimeMillis_ = throttlerLogTimeMillis;
}

void Throttler::setThrottlerRates(double& avgRateBytesPerSec,
double& bucketRateBytesPerSec,
double& bytesTokenBucketLimit) {
void Throttler::setThrottlerRates(double& avgRatePerSec,
double& bucketRatePerSec,
double& tokenBucketLimit) {
// configureOptions will change the rates in case they don't make
// sense
configureOptions(avgRateBytesPerSec, bucketRateBytesPerSec,
bytesTokenBucketLimit);
configureOptions(avgRatePerSec, bucketRatePerSec, tokenBucketLimit);
folly::SpinLockGuard lock(throttlerMutex_);

resetState();

WLOG(INFO) << "Updating the rates avgRateBytesPerSec : " << avgRateBytesPerSec
<< " bucketRateBytesPerSec : " << bucketRateBytesPerSec
<< " bytesTokenBucketLimit : " << bytesTokenBucketLimit;
avgRateBytesPerSec_ = avgRateBytesPerSec;
bucketRateBytesPerSec_ = bucketRateBytesPerSec;
bytesTokenBucketLimit_ = bytesTokenBucketLimit;
WLOG(INFO) << "Updating the rates avgRatePerSec : " << avgRatePerSec
<< " bucketRatePerSec : " << bucketRatePerSec
<< " tokenBucketLimit : " << tokenBucketLimit;
avgRatePerSec_ = avgRatePerSec;
bucketRatePerSec_ = bucketRatePerSec;
tokenBucketLimit_ = tokenBucketLimit;
}

void Throttler::setThrottlerRates(const WdtOptions& options) {
double avgRateBytesPerSec = options.avg_mbytes_per_sec * kMbToB;
double peakRateBytesPerSec = options.max_mbytes_per_sec * kMbToB;
double bucketLimitBytes = options.throttler_bucket_limit * kMbToB;
setThrottlerRates(avgRateBytesPerSec, peakRateBytesPerSec, bucketLimitBytes);
void Throttler::setThrottlerRates(const ThrottlerOptions& options) {
double avgRatePerSec = options.avg_rate_per_sec;
double peakRatePerSec = options.max_rate_per_sec;
double bucketLimit = options.throttler_bucket_limit;
setThrottlerRates(avgRatePerSec, peakRatePerSec, bucketLimit);
}

void Throttler::limit(ThreadCtx& threadCtx, int64_t deltaProgress) {
Expand Down Expand Up @@ -166,29 +158,28 @@ double Throttler::calculateSleep(double deltaProgress,
WLOG(ERROR) << "Using the throttler without registering the transfer";
return -1;
}
bytesProgress_ += deltaProgress;
progress_ += deltaProgress;
double avgThrottlerSleep = averageThrottler(now);
const bool willSleep = (avgThrottlerSleep > 0);
if (willSleep) {
return avgThrottlerSleep;
}
// we still hold the lock if peak throttler can come into effect
if ((bucketRateBytesPerSec_ > 0) && (bytesTokenBucketLimit_ > 0)) {
if ((bucketRatePerSec_ > 0) && (tokenBucketLimit_ > 0)) {
std::chrono::duration<double> elapsedDuration = now - lastFillTime_;
lastFillTime_ = now;
double elapsedSeconds = elapsedDuration.count();
bytesTokenBucket_ += elapsedSeconds * bucketRateBytesPerSec_;
if (bytesTokenBucket_ > bytesTokenBucketLimit_) {
bytesTokenBucket_ = bytesTokenBucketLimit_;
tokenBucket_ += elapsedSeconds * bucketRatePerSec_;
if (tokenBucket_ > tokenBucketLimit_) {
tokenBucket_ = tokenBucketLimit_;
}
bytesTokenBucket_ -= deltaProgress;
if (bytesTokenBucket_ < 0) {
tokenBucket_ -= deltaProgress;
if (tokenBucket_ < 0) {
/*
* If we have negative number of tokens lets sleep
* This way we will have positive number of tokens next time
*/
double peakThrottlerSleep =
-1.0 * bytesTokenBucket_ / bucketRateBytesPerSec_;
double peakThrottlerSleep = -1.0 * tokenBucket_ / bucketRatePerSec_;
WVLOG(2) << "Peak throttler wants to sleep " << peakThrottlerSleep
<< " seconds";
return peakThrottlerSleep;
Expand All @@ -209,37 +200,36 @@ void Throttler::printPeriodicLogs(const Clock::time_point& now,
elapsedLogDuration = now - lastLogTime_;
double elapsedLogSeconds = elapsedLogDuration.count();
if (elapsedLogSeconds * kMillisecsPerSec >= throttlerLogTimeMillis_) {
double instantBytesPerSec = 0;
instantBytesPerSec = instantProgress_ / elapsedLogSeconds;
double instantRatePerSec = 0;
instantRatePerSec = instantProgress_ / elapsedLogSeconds;
instantProgress_ = 0;
lastLogTime_ = now;
std::chrono::duration<double> elapsedAvgDuration = now - startTime_;
double elapsedAvgSeconds = elapsedAvgDuration.count();
double avgBytesPerSec = bytesProgress_ / elapsedAvgSeconds;
double avgRatePerSec = progress_ / elapsedAvgSeconds;
WLOG(INFO) << "Throttler:Transfer_Rates::"
<< " " << elapsedAvgSeconds << " " << avgBytesPerSec / kMbToB
<< " " << instantBytesPerSec / kMbToB << " " << deltaProgress;
<< " " << elapsedAvgSeconds << " " << avgRatePerSec << " "
<< instantRatePerSec << " " << deltaProgress;
}
}

double Throttler::averageThrottler(const Clock::time_point& now) {
std::chrono::duration<double> elapsedDuration = now - startTime_;
double elapsedSeconds = elapsedDuration.count();
if (avgRateBytesPerSec_ <= 0) {
if (avgRatePerSec_ <= 0) {
WVLOG(2) << "There is no avg rate limit";
return -1;
}
const double allowedProgressBytes = avgRateBytesPerSec_ * elapsedSeconds;
if (bytesProgress_ > allowedProgressBytes) {
double idealTime = bytesProgress_ / avgRateBytesPerSec_;
const double allowedProgress = avgRatePerSec_ * elapsedSeconds;
if (progress_ > allowedProgress) {
double idealTime = progress_ / avgRatePerSec_;
const double sleepTimeSeconds = idealTime - elapsedSeconds;
WVLOG(1) << "Throttler : Elapsed " << elapsedSeconds
<< " seconds. Made progress " << bytesProgress_ / kMbToB
<< " Mbytes in " << elapsedSeconds
<< " seconds. Made progress " << progress_ << " in "
<< elapsedSeconds
<< " seconds, maximum allowed progress for this duration is "
<< allowedProgressBytes / kMbToB
<< " Mbytes. Mean Rate allowed is " << avgRateBytesPerSec_ / kMbToB
<< " Mbytes/sec. Sleeping for " << sleepTimeSeconds << " seconds";
<< allowedProgress << ". Mean Rate allowed is " << avgRatePerSec_
<< " . Sleeping for " << sleepTimeSeconds << " seconds";
return sleepTimeSeconds;
}
return -1;
Expand All @@ -258,8 +248,8 @@ void Throttler::resetState() {
lastFillTime_ = startTime_;
lastLogTime_ = startTime_;
instantProgress_ = 0;
bytesProgress_ = 0;
bytesTokenBucket_ = 0;
progress_ = 0;
tokenBucket_ = 0;
}

void Throttler::endTransfer() {
Expand All @@ -268,24 +258,24 @@ void Throttler::endTransfer() {
refCount_--;
}

double Throttler::getBytesProgress() {
double Throttler::getProgress() {
folly::SpinLockGuard lock(throttlerMutex_);
return bytesProgress_;
return progress_;
}

double Throttler::getAvgRateBytesPerSec() {
double Throttler::getAvgRatePerSec() {
folly::SpinLockGuard lock(throttlerMutex_);
return avgRateBytesPerSec_;
return avgRatePerSec_;
}

double Throttler::getPeakRateBytesPerSec() {
double Throttler::getPeakRatePerSec() {
folly::SpinLockGuard lock(throttlerMutex_);
return bucketRateBytesPerSec_;
return bucketRatePerSec_;
}

double Throttler::getBucketLimitBytes() {
double Throttler::getBucketLimit() {
folly::SpinLockGuard lock(throttlerMutex_);
return bytesTokenBucketLimit_;
return tokenBucketLimit_;
}

int64_t Throttler::getThrottlerLogTimeMillis() {
Expand All @@ -299,13 +289,10 @@ void Throttler::setThrottlerLogTimeMillis(int64_t throttlerLogTimeMillis) {
}

std::ostream& operator<<(std::ostream& stream, const Throttler& throttler) {
stream << "avgRate: " << throttler.avgRateBytesPerSec_ / kMbToB
<< " Mbytes/sec, peakRate: "
<< throttler.bucketRateBytesPerSec_ / kMbToB
<< " Mbytes/sec, bucketLimit: "
<< throttler.bytesTokenBucketLimit_ / kMbToB
<< " Mbytes, throttlerLogTimeMillis: "
<< throttler.throttlerLogTimeMillis_;
stream << "avgRate: " << throttler.avgRatePerSec_
<< ", peakRate: " << throttler.bucketRatePerSec_
<< ", bucketLimit: " << throttler.tokenBucketLimit_
<< ", throttlerLogTimeMillis: " << throttler.throttlerLogTimeMillis_;
return stream;
}
}
Expand Down
Loading

0 comments on commit 87b03e2

Please sign in to comment.