Skip to content

Commit

Permalink
Introduced pause and resume to the pacer
Browse files Browse the repository at this point in the history
  • Loading branch information
[email protected] committed Mar 22, 2013
1 parent f49577f commit 9522792
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 40 deletions.
15 changes: 15 additions & 0 deletions modules/pacing/include/paced_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class PacedSender : public Module {
kNormalPriority = 2, // Put in back of the line.
kLowPriority = 3, // Put in back of the low priority line.
};
// Low priority packets are mixed with the normal priority packets
// while we are paused.

class Callback {
public:
// Note: packets sent as a result of a callback should not pass by this
Expand All @@ -47,6 +50,12 @@ class PacedSender : public Module {
// Enable/disable pacing.
void SetStatus(bool enable);

// Temporarily pause all sending.
void Pause();

// Resume sending packets.
void Resume();

// Current total estimated bitrate.
void UpdateBitrate(int target_bitrate_kbps);

Expand Down Expand Up @@ -80,6 +89,10 @@ class PacedSender : public Module {
bool GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number,
int64_t* capture_time_ms);

// Local helper function to GetNextPacket.
void GetNextPacketFromList(std::list<Packet>* list,
uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms);

// Updates the number of bytes that can be sent for the next time interval.
void UpdateBytesPerInterval(uint32_t delta_time_in_ms);

Expand All @@ -88,13 +101,15 @@ class PacedSender : public Module {

Callback* callback_;
bool enable_;
bool paused_;
scoped_ptr<CriticalSectionWrapper> critsect_;
int target_bitrate_kbytes_per_s_;
int bytes_remaining_interval_;
int padding_bytes_remaining_interval_;
TickTime time_last_update_;
TickTime time_last_send_;

std::list<Packet> high_priority_packets_;
std::list<Packet> normal_priority_packets_;
std::list<Packet> low_priority_packets_;
};
Expand Down
102 changes: 75 additions & 27 deletions modules/pacing/paced_sender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ namespace webrtc {
PacedSender::PacedSender(Callback* callback, int target_bitrate_kbps)
: callback_(callback),
enable_(false),
paused_(false),
critsect_(CriticalSectionWrapper::CreateCriticalSection()),
target_bitrate_kbytes_per_s_(target_bitrate_kbps >> 3), // Divide by 8.
bytes_remaining_interval_(0),
Expand All @@ -48,10 +49,21 @@ PacedSender::PacedSender(Callback* callback, int target_bitrate_kbps)
}

PacedSender::~PacedSender() {
high_priority_packets_.clear();
normal_priority_packets_.clear();
low_priority_packets_.clear();
}

void PacedSender::Pause() {
CriticalSectionScoped cs(critsect_.get());
paused_ = true;
}

void PacedSender::Resume() {
CriticalSectionScoped cs(critsect_.get());
paused_ = false;
}

void PacedSender::SetStatus(bool enable) {
CriticalSectionScoped cs(critsect_.get());
enable_ = enable;
Expand All @@ -70,20 +82,47 @@ bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
UpdateState(bytes);
return true; // We can send now.
}
if (paused_) {
// Queue all packets when we are paused.
switch (priority) {
case kHighPriority:
high_priority_packets_.push_back(
Packet(ssrc, sequence_number, capture_time_ms, bytes));
break;
case kNormalPriority:
case kLowPriority:
// Queue the low priority packets in the normal priority queue when we
// are paused to avoid starvation.
normal_priority_packets_.push_back(
Packet(ssrc, sequence_number, capture_time_ms, bytes));
break;
}
return false;
}

switch (priority) {
case kHighPriority:
UpdateState(bytes);
return true; // We can send now.
if (high_priority_packets_.empty() &&
bytes_remaining_interval_ > 0) {
UpdateState(bytes);
return true; // We can send now.
}
high_priority_packets_.push_back(
Packet(ssrc, sequence_number, capture_time_ms, bytes));
return false;
case kNormalPriority:
if (normal_priority_packets_.empty() && bytes_remaining_interval_ > 0) {
if (high_priority_packets_.empty() &&
normal_priority_packets_.empty() &&
bytes_remaining_interval_ > 0) {
UpdateState(bytes);
return true; // We can send now.
}
normal_priority_packets_.push_back(
Packet(ssrc, sequence_number, capture_time_ms, bytes));
return false;
case kLowPriority:
if (normal_priority_packets_.empty() &&
if (high_priority_packets_.empty() &&
normal_priority_packets_.empty() &&
low_priority_packets_.empty() &&
bytes_remaining_interval_ > 0) {
UpdateState(bytes);
Expand Down Expand Up @@ -114,7 +153,7 @@ int32_t PacedSender::Process() {
CriticalSectionScoped cs(critsect_.get());
int elapsed_time_ms = (now - time_last_update_).Milliseconds();
time_last_update_ = now;
if (elapsed_time_ms > 0) {
if (!paused_ && elapsed_time_ms > 0) {
uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
UpdateBytesPerInterval(delta_time_ms);
uint32_t ssrc;
Expand All @@ -125,7 +164,8 @@ int32_t PacedSender::Process() {
callback_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms);
critsect_->Enter();
}
if (normal_priority_packets_.empty() &&
if (high_priority_packets_.empty() &&
normal_priority_packets_.empty() &&
low_priority_packets_.empty() &&
padding_bytes_remaining_interval_ > 0) {
critsect_->Leave();
Expand Down Expand Up @@ -164,41 +204,49 @@ bool PacedSender::GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number,
if (bytes_remaining_interval_ <= 0) {
// All bytes consumed for this interval.
// Check if we have not sent in a too long time.
if (!normal_priority_packets_.empty()) {
if ((TickTime::Now() - time_last_send_).Milliseconds() >
kMaxQueueTimeWithoutSendingMs) {
Packet packet = normal_priority_packets_.front();
UpdateState(packet.bytes_);
*sequence_number = packet.sequence_number_;
*ssrc = packet.ssrc_;
*capture_time_ms = packet.capture_time_ms_;
normal_priority_packets_.pop_front();
if ((TickTime::Now() - time_last_send_).Milliseconds() >
kMaxQueueTimeWithoutSendingMs) {
if (!high_priority_packets_.empty()) {
GetNextPacketFromList(&high_priority_packets_, ssrc, sequence_number,
capture_time_ms);
return true;
}
if (!normal_priority_packets_.empty()) {
GetNextPacketFromList(&normal_priority_packets_, ssrc, sequence_number,
capture_time_ms);
return true;
}
}
return false;
}
if (!high_priority_packets_.empty()) {
GetNextPacketFromList(&high_priority_packets_, ssrc, sequence_number,
capture_time_ms);
return true;
}
if (!normal_priority_packets_.empty()) {
Packet packet = normal_priority_packets_.front();
UpdateState(packet.bytes_);
*sequence_number = packet.sequence_number_;
*ssrc = packet.ssrc_;
*capture_time_ms = packet.capture_time_ms_;
normal_priority_packets_.pop_front();
GetNextPacketFromList(&normal_priority_packets_, ssrc, sequence_number,
capture_time_ms);
return true;
}
if (!low_priority_packets_.empty()) {
Packet packet = low_priority_packets_.front();
UpdateState(packet.bytes_);
*sequence_number = packet.sequence_number_;
*ssrc = packet.ssrc_;
*capture_time_ms = packet.capture_time_ms_;
low_priority_packets_.pop_front();
GetNextPacketFromList(&low_priority_packets_, ssrc, sequence_number,
capture_time_ms);
return true;
}
return false;
}

void PacedSender::GetNextPacketFromList(std::list<Packet>* list,
uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms) {
Packet packet = list->front();
UpdateState(packet.bytes_);
*sequence_number = packet.sequence_number_;
*ssrc = packet.ssrc_;
*capture_time_ms = packet.capture_time_ms_;
list->pop_front();
}

// MUST have critsect_ when calling.
void PacedSender::UpdateState(int num_bytes) {
time_last_send_ = TickTime::Now();
Expand Down
83 changes: 70 additions & 13 deletions modules/pacing/paced_sender_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@

#include "webrtc/modules/pacing/include/paced_sender.h"

namespace {
const int kTargetBitrate = 800;
};
using testing::_;

namespace webrtc {
namespace test {

static const int kTargetBitrate = 800;

class MockPacedSenderCallback : public PacedSender::Callback {
public:
Expand Down Expand Up @@ -54,7 +55,7 @@ TEST_F(PacedSenderTest, QueuePacket) {
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number, capture_time_ms, 250));
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
EXPECT_CALL(callback_, TimeToSendPadding(testing::_)).Times(0);
EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, sequence_number, capture_time_ms)).Times(0);
TickTime::AdvanceFakeClock(4);
Expand Down Expand Up @@ -87,12 +88,12 @@ TEST_F(PacedSenderTest, PaceQueuedPackets) {
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, capture_time_ms, 250));
}
EXPECT_CALL(callback_, TimeToSendPadding(testing::_)).Times(0);
EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
for (int k = 0; k < 10; ++k) {
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, testing::_, capture_time_ms)).Times(3);
TimeToSendPacket(ssrc, _, capture_time_ms)).Times(3);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
}
Expand Down Expand Up @@ -159,26 +160,82 @@ TEST_F(PacedSenderTest, Priority) {
ssrc, sequence_number++, capture_time_ms, 250));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, sequence_number++, capture_time_ms, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kHighPriority,
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kHighPriority,
ssrc, sequence_number++, capture_time_ms, 250));

// Expect all normal priority to be sent out first.
EXPECT_CALL(callback_, TimeToSendPadding(testing::_)).Times(0);
EXPECT_CALL(callback_,
TimeToSendPacket(ssrc, testing::_, capture_time_ms)).Times(2);
// Expect all high and normal priority to be sent out first.
EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms)).Times(3);

EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());

EXPECT_CALL(callback_, TimeToSendPacket(
ssrc_low_priority, _, capture_time_ms_low_priority)).Times(1);

EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
}

TEST_F(PacedSenderTest, Pause) {
uint32_t ssrc_low_priority = 12345;
uint32_t ssrc = 12346;
uint16_t sequence_number = 1234;
int64_t capture_time_ms = 56789;
int64_t second_capture_time_ms = 67890;

// Due to the multiplicative factor we can send 3 packets not 2 packets.
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kLowPriority,
ssrc_low_priority, sequence_number++, capture_time_ms, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, sequence_number++, capture_time_ms, 250));
EXPECT_TRUE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, sequence_number++, capture_time_ms, 250));

send_bucket_->Pause();

// Expect everything to be queued.
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kLowPriority,
ssrc_low_priority, sequence_number++, capture_time_ms, 250));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, sequence_number++, capture_time_ms, 250));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
ssrc, sequence_number++, second_capture_time_ms, 250));
EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kHighPriority,
ssrc, sequence_number++, capture_time_ms, 250));

// Expect no packet to come out while paused.
EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
EXPECT_CALL(callback_, TimeToSendPacket(_, _, _)).Times(0);

for (int i = 0; i < 10; ++i) {
TickTime::AdvanceFakeClock(5);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
}
// Expect high prio packets to come out first followed by all packets in the
// way they were added.
EXPECT_CALL(callback_, TimeToSendPacket(_, _, capture_time_ms)).Times(3);

send_bucket_->Resume();

EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());

EXPECT_CALL(callback_, TimeToSendPacket(ssrc_low_priority,
testing::_, capture_time_ms_low_priority)).Times(1);
EXPECT_CALL(callback_,
TimeToSendPacket(_, _, second_capture_time_ms)).Times(1);

EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
}

} // namespace test
} // namespace webrtc

0 comments on commit 9522792

Please sign in to comment.