forked from facebook/wdt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathThrottler.cpp
123 lines (122 loc) · 5 KB
/
Throttler.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#include "Throttler.h"
#include "WdtOptions.h"
const int64_t kMillisecsPerSec = 1000;
const double kMbToB = 1024 * 1024;
namespace facebook {
namespace wdt {
Throttler::Throttler(Clock::time_point start, double avgRateBytesPerSec,
double peakRateBytesPerSec, double bucketLimitBytes,
double throttlerLogTimeMillis)
: startTime_(start),
bytesProgress_(0),
avgRateBytesPerSec_(avgRateBytesPerSec) {
bucketRateBytesPerSec_ = peakRateBytesPerSec;
bytesTokenBucketLimit_ = 2 * bucketRateBytesPerSec_ * 0.25;
/* We keep the number of tokens generated as zero initially
* It could be argued that we keep this filled when we created the
* bucket. However the startTime is passed in this case and the hope is
* that we will have enough number of tokens by the time we send the data
*/
bytesTokenBucket_ = 0;
if (bucketLimitBytes > 0) {
bytesTokenBucketLimit_ = bucketLimitBytes;
}
if (avgRateBytesPerSec > 0) {
LOG(INFO) << "Average rate " << avgRateBytesPerSec_ / kMbToB;
} else {
LOG(INFO) << "No average rate specified";
}
if (bucketRateBytesPerSec_ > 0) {
LOG(INFO) << "Peak rate " << bucketRateBytesPerSec_ / kMbToB
<< " mbytes / seconds. Bucket limit "
<< bytesTokenBucketLimit_ / kMbToB << " mbytes.";
} else {
LOG(INFO) << "No peak rate specified";
}
instantProgress_ = 0;
lastFillTime_ = start;
isTokenBucketEnabled = (bucketRateBytesPerSec_ > 0);
throttlerLogTimeMillis_ = throttlerLogTimeMillis;
}
void Throttler::limit(double bytesTotalProgress) {
double deltaProgress = bytesTotalProgress - bytesProgress_;
bytesProgress_ = bytesTotalProgress;
std::chrono::time_point<Clock> now = Clock::now();
double avgThrottlerSleep = averageThrottler(now);
const bool hasSlept = (avgThrottlerSleep > 0);
double sleepTimeSeconds = 0;
if (isTokenBucketEnabled && !hasSlept) {
std::chrono::duration<double> elapsedDuration = now - lastFillTime_;
lastFillTime_ = now;
double elapsedSeconds = elapsedDuration.count();
bytesTokenBucket_ += elapsedSeconds * bucketRateBytesPerSec_;
if (bytesTokenBucket_ > bytesTokenBucketLimit_) {
bytesTokenBucket_ = bytesTokenBucketLimit_;
}
bytesTokenBucket_ -= deltaProgress;
if (bytesTokenBucket_ < 0) {
/*
* If we have negative number of tokens lets sleep
* This way we will have positive number of tokens next time
*/
sleepTimeSeconds = -1.0 * bytesTokenBucket_ / bucketRateBytesPerSec_;
VLOG(1) << "Peak throttler wants to sleep " << sleepTimeSeconds
<< " seconds";
std::this_thread::sleep_for(
std::chrono::duration<double>(sleepTimeSeconds));
}
}
sleepTimeSeconds += avgThrottlerSleep;
if (throttlerLogTimeMillis_ > 0) {
printPeriodicLogs(deltaProgress, now, sleepTimeSeconds);
}
}
void Throttler::printPeriodicLogs(double deltaProgress,
const Clock::time_point &now,
double sleepTimeSeconds) {
instantProgress_ += deltaProgress;
/*
* This is the part where throttler prints out the progress
* made periodically.
*/
std::chrono::duration<double> elapsedLogDuration = now - lastLogTime_;
double elapsedLogSeconds = elapsedLogDuration.count() + sleepTimeSeconds;
if (elapsedLogSeconds * kMillisecsPerSec >= throttlerLogTimeMillis_) {
std::chrono::duration<double> elapsedAvgDuration = now - startTime_;
double elapsedAvgSeconds = elapsedAvgDuration.count() + sleepTimeSeconds;
double instantBytesPerSec = instantProgress_ / (double)elapsedLogSeconds;
double avgBytesPerSec = (bytesProgress_) / elapsedAvgSeconds;
LOG(INFO) << "Throttler:Transfer_Rates::"
<< " " << elapsedAvgSeconds << " " << avgBytesPerSec / kMbToB
<< " " << instantBytesPerSec / kMbToB;
lastLogTime_ = now;
instantProgress_ = 0;
}
}
double Throttler::averageThrottler(const Clock::time_point &now) {
std::chrono::duration<double> elapsedDuration = now - startTime_;
double elapsedSeconds = elapsedDuration.count();
if (avgRateBytesPerSec_ <= 0) {
VLOG(1) << "There is no rate limit";
return 0;
}
const double allowedProgress = avgRateBytesPerSec_ * elapsedSeconds;
if (bytesProgress_ > allowedProgress) {
double idealTime = bytesProgress_ / avgRateBytesPerSec_;
const double sleepTimeSeconds = idealTime - elapsedSeconds;
VLOG(1) << "Throttler : Elapsed " << elapsedSeconds
<< " seconds. Made progress " << bytesProgress_ / kMbToB
<< " mbytes in " << elapsedSeconds
<< " seconds, maximum allowed progress for this duration is "
<< allowedProgress / kMbToB << " mbytes. Mean Rate allowed is "
<< avgRateBytesPerSec_ / kMbToB
<< " mbytes per seconds. Sleeping for " << sleepTimeSeconds
<< " seconds";
std::this_thread::sleep_for(
std::chrono::duration<double>(sleepTimeSeconds));
return sleepTimeSeconds;
}
return 0;
}
}
} // namespace facebook::wormhole