Skip to content

Commit

Permalink
wdt shared throttler across threads
Browse files Browse the repository at this point in the history
Summary: The throttler as it is today cannot work for us anymore because of the reasons that this diff addresses :
1. Current throttler is not thread safe. And so even if a thread is slow other threads can't use the bandwidth allocated to the slower thread
2. Current throttler needs a start time, which is not going to be applicable in cases of integration with zippydb etc because we want to share one throttler
   for all the recoveries and although it can be done using ref counting

Changed the throttler test to not use wdt_e2e because it is much easier to look and change this way

Reviewed By: @uddipta

Differential Revision: D2094334
  • Loading branch information
nikunjy authored and ldemailly committed Jul 17, 2015
1 parent db2cd6d commit 6882a40
Show file tree
Hide file tree
Showing 8 changed files with 412 additions and 153 deletions.
48 changes: 48 additions & 0 deletions Receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,27 @@ std::unique_ptr<TransferReport> Receiver::finish() {
return report;
}

void Receiver::configureThrottler() {
WDT_CHECK(!throttler_);
VLOG(1) << "Configuring throttler options";
const auto &options = WdtOptions::get();
double avgRateBytesPerSec = options.avg_mbytes_per_sec * kMbToB;
double peakRateBytesPerSec = options.max_mbytes_per_sec * kMbToB;
double bucketLimitBytes = options.throttler_bucket_limit * kMbToB;
throttler_ = Throttler::makeThrottler(avgRateBytesPerSec, peakRateBytesPerSec,
bucketLimitBytes,
options.throttler_log_time_millis);
if (throttler_) {
LOG(INFO) << "Enabling throttling " << *throttler_;
} else {
LOG(INFO) << "Throttling not enabled";
}
}

void Receiver::setThrottler(std::shared_ptr<Throttler> throttler) {
throttler_ = throttler;
}

ErrorCode Receiver::transferAsync() {
const auto &options = WdtOptions::get();
if (hasPendingTransfer()) {
Expand Down Expand Up @@ -370,6 +391,12 @@ void Receiver::start() {
for (int i = 0; i < threadServerSockets_.size(); i++) {
threadStats_.emplace_back(true);
}
if (!throttler_) {
configureThrottler();
} else {
LOG(INFO) << "Throttler set externally. Throttler : " << *throttler_;
}

for (int i = 0; i < threadServerSockets_.size(); i++) {
receiverThreads_.emplace_back(&Receiver::receiveOne, this, i,
std::ref(threadServerSockets_[i]), bufferSize,
Expand Down Expand Up @@ -402,6 +429,9 @@ void Receiver::endCurGlobalSession() {
WDT_CHECK(transferFinishedCount_ + 1 == transferStartedCount_);
LOG(INFO) << "Received done for all threads. Transfer session "
<< transferStartedCount_ << " finished";
if (throttler_) {
throttler_->deRegisterTransfer();
}
transferFinishedCount_++;
waitingThreadCount_ = 0;
waitingWithErrorThreadCount_ = 0;
Expand Down Expand Up @@ -429,6 +459,13 @@ bool Receiver::hasNewSessionStarted(ThreadData &data) {

void Receiver::startNewGlobalSession() {
WDT_CHECK(transferStartedCount_ == transferFinishedCount_);
if (throttler_) {
// If throttler is configured/set then register this session
// in the throttler. This is guranteed to work in either of the
// modes long running or not. We will de register from the throttler
// when the current session ends
throttler_->registerTransfer();
}
transferStartedCount_++;
startTime_ = Clock::now();
LOG(INFO) << "New transfer started " << transferStartedCount_;
Expand Down Expand Up @@ -783,6 +820,12 @@ Receiver::ReceiverState Receiver::processFileCmd(ThreadData &data) {
if (enableChecksum) {
checksum = folly::crc32c((const uint8_t *)(buf + off), toWrite, checksum);
}
if (throttler_) {
// We might be reading more than we require for this file but
// throttling should make sense for any additional bytes received
// on the network
throttler_->limit(toWrite + headerBytes);
}
ErrorCode code = writer.write(buf + off, toWrite);
if (code != OK) {
threadStats.setErrorCode(code);
Expand All @@ -794,6 +837,11 @@ Receiver::ReceiverState Receiver::processFileCmd(ThreadData &data) {
while (writer.getTotalWritten() < dataSize) {
int64_t nres = readAtMost(socket, buf, bufferSize,
dataSize - writer.getTotalWritten());
if (throttler_) {
// We only know how much we have read after we are done calling
// readAtMost. Call throttler with the bytes read off the wire.
throttler_->limit(nres);
}
if (nres <= 0) {
break;
}
Expand Down
10 changes: 10 additions & 0 deletions Receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "ServerSocket.h"
#include "Protocol.h"
#include "Writer.h"
#include "Throttler.h"
#include <memory>
#include <string>
#include <condition_variable>
Expand Down Expand Up @@ -119,6 +120,9 @@ class Receiver {
/// @param progressReporter progress reporter to be used
void setProgressReporter(std::unique_ptr<ProgressReporter> &progressReporter);

/// Set throttler externally. Should be set before any transfer calls
void setThrottler(std::shared_ptr<Throttler> throttler);

protected:
/// receiver state
enum ReceiverState {
Expand Down Expand Up @@ -419,6 +423,9 @@ class Receiver {
/// ends current thread session
void endCurThreadSession(ThreadData &data);

/// Auto configure the throttler if none set externally
void configureThrottler();

/**
* increments failed thread count, does not wait for transfer to finish
*/
Expand Down Expand Up @@ -504,6 +511,9 @@ class Receiver {

/// condition variable to coordinate transfer finish
mutable std::condition_variable conditionAllFinished_;

/// Global throttler across all threads
std::shared_ptr<Throttler> throttler_;
};
}
} // namespace facebook::wdt
106 changes: 46 additions & 60 deletions Sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ std::unique_ptr<TransferReport> Sender::getTransferReport() {
return transferReport;
}

void Sender::setThrottler(std::shared_ptr<Throttler> throttler) {
VLOG(2) << "Setting an external throttler";
throttler_ = throttler;
}

bool Sender::isTransferFinished() {
std::unique_lock<std::mutex> lock(mutex_);
return transferFinished_;
Expand All @@ -254,41 +259,21 @@ Clock::time_point Sender::getEndTime() {
return endTime_;
}

void Sender::fillThrottlerOptions(ThrottlerOptions &throttlerOptions) {
void Sender::configureThrottler() {
WDT_CHECK(!throttler_);
VLOG(1) << "Configuring throttler options";
const auto &options = WdtOptions::get();
int numSockets = ports_.size();
double avgRateBytesPerSec = options.avg_mbytes_per_sec * kMbToB;
double peakRateBytesPerSec = options.max_mbytes_per_sec * kMbToB;
double bucketLimitBytes = options.throttler_bucket_limit * kMbToB;
double perThreadAvgRateBytesPerSec = avgRateBytesPerSec / numSockets;
double perThreadPeakRateBytesPerSec = peakRateBytesPerSec / numSockets;
double perThreadBucketLimit = bucketLimitBytes / numSockets;
if (avgRateBytesPerSec < 1.0 && avgRateBytesPerSec >= 0) {
LOG(FATAL) << "Realistic average rate should be greater than 1.0 bytes/sec";
}
if (perThreadPeakRateBytesPerSec < perThreadAvgRateBytesPerSec &&
perThreadPeakRateBytesPerSec >= 0) {
LOG(WARNING) << "Per thread peak rate should be greater "
<< "than per thread average rate. "
<< "Making peak rate 1.2 times the average rate";
perThreadPeakRateBytesPerSec =
kPeakMultiplier * perThreadAvgRateBytesPerSec;
}
if (perThreadBucketLimit <= 0 && perThreadPeakRateBytesPerSec > 0) {
perThreadBucketLimit =
kTimeMultiplier * kBucketMultiplier * perThreadPeakRateBytesPerSec;
LOG(INFO) << "Burst limit not specified but peak "
<< "rate is configured. Auto configuring to "
<< perThreadBucketLimit / kMbToB << " mbytes";
throttler_ = Throttler::makeThrottler(avgRateBytesPerSec, peakRateBytesPerSec,
bucketLimitBytes,
options.throttler_log_time_millis);
if (throttler_) {
LOG(INFO) << "Enabling throttling " << *throttler_;
} else {
LOG(INFO) << "Throttling not enabled";
}
VLOG(1) << "Per thread (Avg Rate, Peak Rate) = "
<< "(" << perThreadAvgRateBytesPerSec << ", "
<< perThreadPeakRateBytesPerSec << ")";

throttlerOptions.avgRateBytes = perThreadAvgRateBytesPerSec;
throttlerOptions.maxRateBytes = perThreadPeakRateBytesPerSec;
throttlerOptions.bucketLimitBytes = perThreadBucketLimit;
}

std::unique_ptr<TransferReport> Sender::finish() {
Expand Down Expand Up @@ -387,7 +372,13 @@ ErrorCode Sender::start() {
if (twoPhases) {
dirThread_.join();
}
fillThrottlerOptions(perThreadThrottlerOptions_);
if (throttler_) {
LOG(INFO) << "Skipping throttler setup. External throttler set."
<< "Throttler details : " << *throttler_;
} else {
configureThrottler();
}

// WARNING: Do not MERGE the follwing two loops. ThreadTransferHistory keeps a
// reference of TransferStats. And, any emplace operation on a vector
// invalidates all its references
Expand Down Expand Up @@ -611,9 +602,8 @@ Sender::SenderState Sender::sendBlocks(ThreadData &data) {
return SEND_DONE_CMD;
}
WDT_CHECK(!source->hasError());
size_t totalBytes = threadStats.getTotalBytes(false);
TransferStats transferStats = sendOneByteSource(
data.socket_, data.throttler_, source, totalBytes, transferStatus);
TransferStats transferStats =
sendOneByteSource(data.socket_, source, transferStatus);
threadStats += transferStats;
source->addTransferStats(transferStats);
source->close();
Expand Down Expand Up @@ -867,25 +857,14 @@ void Sender::sendOne(int threadIndex) {
endTime_ = Clock::now();
transferFinished_ = true;
}
if (throttler_) {
throttler_->deRegisterTransfer();
}
});
std::unique_ptr<Throttler> throttler;
double avgRateBytes = perThreadThrottlerOptions_.avgRateBytes;
double maxRateBytes = perThreadThrottlerOptions_.maxRateBytes;
double bucketLimitBytes = perThreadThrottlerOptions_.bucketLimitBytes;
const bool doThrottling = (avgRateBytes > 0 || maxRateBytes > 0);
if (doThrottling) {
throttler = folly::make_unique<Throttler>(
startTime, avgRateBytes, maxRateBytes, bucketLimitBytes,
options.throttler_log_time_millis);
} else {
VLOG(1) << "No throttling in effect";
if (throttler_) {
throttler_->registerTransfer();
}

ThreadData threadData(threadIndex, queue, threadStats, transferHistories);
if (doThrottling) {
threadData.throttler_ = std::move(throttler);
}

SenderState state = CONNECT;
while (state != END) {
if (isAborted()) {
Expand Down Expand Up @@ -916,9 +895,7 @@ bool Sender::isAborted() {

TransferStats Sender::sendOneByteSource(
const std::unique_ptr<ClientSocket> &socket,
const std::unique_ptr<Throttler> &throttler,
const std::unique_ptr<ByteSource> &source, const size_t totalBytes,
ErrorCode transferStatus) {
const std::unique_ptr<ByteSource> &source, ErrorCode transferStatus) {
TransferStats stats;
auto &options = WdtOptions::get();
char headerBuf[Protocol::kMaxHeader];
Expand All @@ -945,6 +922,9 @@ TransferStats Sender::sendOneByteSource(
return stats;
}
stats.addHeaderBytes(written);
size_t byteSourceHeaderBytes = written;
size_t throttlerInstanceBytes = byteSourceHeaderBytes;
size_t totalThrottlerBytes = 0;
VLOG(3) << "Sent " << written << " on " << socket->getFd() << " : "
<< folly::humanify(std::string(headerBuf, off));
uint32_t checksum = 0;
Expand All @@ -962,17 +942,20 @@ TransferStats Sender::sendOneByteSource(
checksum = folly::crc32c((const uint8_t *)buffer, size, checksum);
}
written = 0;
if (throttler) {
if (throttler_) {
/**
* If throttling is enabled we call limit(totalBytes) which
* If throttling is enabled we call limit(deltaBytes) which
* used both the methods of throttling peak and average.
* Always call it with totalBytes written till now, throttler
* will do the rest. Total bytes includes header and the data bytes.
* The throttler was constructed at the time when the header
* was being written and it is okay to start throttling with the
* next expected write.
* Always call it with bytes being written to the wire, throttler
* will do the rest.
* The first time throttle is called with the header bytes
* included. In the next iterations throttler is only called
* with the bytes being written.
*/
throttler->limit(totalBytes + stats.getTotalBytes() + size);
throttlerInstanceBytes += size;
throttler_->limit(throttlerInstanceBytes);
totalThrottlerBytes += throttlerInstanceBytes;
throttlerInstanceBytes = 0;
}
do {
ssize_t w = socket->write(buffer + written, size - written);
Expand Down Expand Up @@ -1002,6 +985,9 @@ TransferStats Sender::sendOneByteSource(
}
actualSize += written;
}
if (throttler_) {
WDT_CHECK(totalThrottlerBytes == actualSize + byteSourceHeaderBytes);
}
if (actualSize != expectedSize) {
// Can only happen if sender thread can not read complete source byte
// stream
Expand Down
28 changes: 12 additions & 16 deletions Sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,6 @@ class ThreadTransferHistory {
folly::SpinLock lock_;
};

struct ThrottlerOptions {
double avgRateBytes;
double maxRateBytes;
double bucketLimitBytes;
};
/**
* The sender for the transfer. One instance of sender should only be
* responsible for one transfer. For a second transfer you should make
Expand Down Expand Up @@ -210,6 +205,13 @@ class Sender {
/// Makes the minimal transfer report using transfer stats of the thread
std::unique_ptr<TransferReport> getTransferReport();

/**
* Users of the wdt sender can set a throttler externally.
* This needs to be called before any call to tansfer or transferAsync
* are made.
*/
void setThrottler(std::shared_ptr<Throttler> throttler);

private:
/// state machine states
enum SenderState {
Expand All @@ -234,7 +236,6 @@ class Sender {
DirectorySourceQueue &queue_;
TransferStats &threadStats_;
std::vector<ThreadTransferHistory> &transferHistories_;
std::unique_ptr<Throttler> throttler_;
std::unique_ptr<ClientSocket> socket_;
char buf_[Protocol::kMinBufLength];
bool totalSizeSent_{false};
Expand Down Expand Up @@ -362,9 +363,7 @@ class Sender {
/// Method responsible for sending one source to the destination
virtual TransferStats sendOneByteSource(
const std::unique_ptr<ClientSocket> &socket,
const std::unique_ptr<Throttler> &throttler,
const std::unique_ptr<ByteSource> &source, const size_t totalBytes,
ErrorCode transferStatus);
const std::unique_ptr<ByteSource> &source, ErrorCode transferStatus);

/// Every sender thread executes this method to send the data
void sendOne(int threadIndex);
Expand Down Expand Up @@ -405,11 +404,8 @@ class Sender {
*/
void reportProgress();

/**
* Configures per thread throttler options such as avg rate
* and peak rate in bytes/sec and bucket limit for Tocken Bucket
*/
void fillThrottlerOptions(ThrottlerOptions &throttlerOptions);
/// Configures the global throttler using the wdt options
void configureThrottler();

/// Pointer to DirectorySourceQueue which reads the srcDir and the files
std::unique_ptr<DirectorySourceQueue> dirQueue_;
Expand Down Expand Up @@ -454,8 +450,8 @@ class Sender {
std::vector<ThreadTransferHistory> transferHistories_;
/// flag representing whether transfer has been aborted or not
bool transferAborted_{false};
/// Throttler options
ThrottlerOptions perThreadThrottlerOptions_;
/// The global throttler shared across all the threads
std::shared_ptr<Throttler> throttler_;
};
}
} // namespace facebook::wdt
Loading

0 comments on commit 6882a40

Please sign in to comment.