Skip to content

Commit

Permalink
Remove pending packets from the pacer when an RTP module is removed.
Browse files Browse the repository at this point in the history
This CL adds functionality to remove packets matching a given SSRC from
the pacer queue, and calls that with any SSRCs used by an RTP module
when that module is removed.

Bug: chromium:1395081
Change-Id: I13c0285ddca600e784ad04a806727a508ede6dcc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/287124
Reviewed-by: Jakob Ivarsson‎ <[email protected]>
Commit-Queue: Erik Språng <[email protected]>
Reviewed-by: Ilya Nikolaevskiy <[email protected]>
Reviewed-by: Philip Eliasson <[email protected]>
Cr-Commit-Position: refs/heads/main@{#38880}
  • Loading branch information
Erik Språng authored and WebRTC LUCI CQ committed Dec 13, 2022
1 parent b02a8f5 commit 1b11b58
Show file tree
Hide file tree
Showing 14 changed files with 296 additions and 44 deletions.
6 changes: 6 additions & 0 deletions audio/channel_send.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ class RtpPacketSenderProxy : public RtpPacketSender {
rtp_packet_pacer_->EnqueuePackets(std::move(packets));
}

void RemovePacketsForSsrc(uint32_t ssrc) override {
MutexLock lock(&mutex_);
rtp_packet_pacer_->RemovePacketsForSsrc(ssrc);
}

private:
SequenceChecker thread_checker_;
Mutex mutex_;
Expand Down Expand Up @@ -565,6 +570,7 @@ void ChannelSend::StopSend() {

RTC_DCHECK(packet_router_);
packet_router_->RemoveSendRtpModule(rtp_rtcp_.get());
rtp_packet_pacer_proxy_->RemovePacketsForSsrc(rtp_rtcp_->SSRC());
}

void ChannelSend::SetEncoder(int payload_type,
Expand Down
11 changes: 11 additions & 0 deletions call/rtp_video_sender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,17 @@ void RtpVideoSender::SetActiveModulesLocked(
// prevent any stray packets in the pacer from asynchronously arriving
// to a disabled module.
transport_->packet_router()->RemoveSendRtpModule(&rtp_module);

// Clear the pacer queue of any packets pertaining to this module.
transport_->packet_sender()->RemovePacketsForSsrc(rtp_module.SSRC());
if (rtp_module.RtxSsrc().has_value()) {
transport_->packet_sender()->RemovePacketsForSsrc(
*rtp_module.RtxSsrc());
}
if (rtp_module.FlexfecSsrc().has_value()) {
transport_->packet_sender()->RemovePacketsForSsrc(
*rtp_module.FlexfecSsrc());
}
}

// If set to false this module won't send media.
Expand Down
77 changes: 77 additions & 0 deletions call/rtp_video_sender_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1098,4 +1098,81 @@ TEST(RtpVideoSenderTest, OverheadIsSubtractedFromTargetBitrate) {
}
}

TEST(RtpVideoSenderTest, ClearsPendingPacketsOnInactivation) {
RtpVideoSenderTestFixture test({kSsrc1}, {kRtxSsrc1}, kPayloadType, {});
test.SetActiveModules({true});

RtpHeaderExtensionMap extensions;
extensions.Register<RtpDependencyDescriptorExtension>(
kDependencyDescriptorExtensionId);
std::vector<RtpPacket> sent_packets;
ON_CALL(test.transport(), SendRtp)
.WillByDefault([&](const uint8_t* packet, size_t length,
const PacketOptions& options) {
sent_packets.emplace_back(&extensions);
EXPECT_TRUE(sent_packets.back().Parse(packet, length));
return true;
});

// Set a very low bitrate.
test.router()->OnBitrateUpdated(
CreateBitrateAllocationUpdate(/*rate_bps=*/30'000),
/*framerate=*/30);

// Create and send a large keyframe.
const size_t kImageSizeBytes = 10000;
constexpr uint8_t kPayload[kImageSizeBytes] = {'a'};
EncodedImage encoded_image;
encoded_image.SetTimestamp(1);
encoded_image.capture_time_ms_ = 2;
encoded_image._frameType = VideoFrameType::kVideoFrameKey;
encoded_image.SetEncodedData(
EncodedImageBuffer::Create(kPayload, sizeof(kPayload)));
EXPECT_EQ(test.router()
->OnEncodedImage(encoded_image, /*codec_specific=*/nullptr)
.error,
EncodedImageCallback::Result::OK);

// Advance time a small amount, check that sent data is only part of the
// image.
test.AdvanceTime(TimeDelta::Millis(5));
DataSize transmittedPayload = DataSize::Zero();
for (const RtpPacket& packet : sent_packets) {
transmittedPayload += DataSize::Bytes(packet.payload_size());
// Make sure we don't see the end of the frame.
EXPECT_FALSE(packet.Marker());
}
EXPECT_GT(transmittedPayload, DataSize::Zero());
EXPECT_LT(transmittedPayload, DataSize::Bytes(kImageSizeBytes / 4));

// Record the RTP timestamp of the first frame.
const uint32_t first_frame_timestamp = sent_packets[0].Timestamp();
sent_packets.clear();

// Disable the sending module and advance time slightly. No packets should be
// sent.
test.SetActiveModules({false});
test.AdvanceTime(TimeDelta::Millis(20));
EXPECT_TRUE(sent_packets.empty());

// Reactive the send module - any packets should have been removed, so nothing
// should be transmitted.
test.SetActiveModules({true});
test.AdvanceTime(TimeDelta::Millis(33));
EXPECT_TRUE(sent_packets.empty());

// Send a new frame.
encoded_image.SetTimestamp(3);
encoded_image.capture_time_ms_ = 4;
EXPECT_EQ(test.router()
->OnEncodedImage(encoded_image, /*codec_specific=*/nullptr)
.error,
EncodedImageCallback::Result::OK);
test.AdvanceTime(TimeDelta::Millis(33));

// Advance time, check we get new packets - but only for the second frame.
EXPECT_FALSE(sent_packets.empty());
EXPECT_NE(sent_packets[0].Timestamp(), first_frame_timestamp);
}

} // namespace webrtc
4 changes: 4 additions & 0 deletions modules/pacing/pacing_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ void PacingController::SetCircuitBreakerThreshold(int num_iterations) {
circuit_breaker_threshold_ = num_iterations;
}

void PacingController::RemovePacketsForSsrc(uint32_t ssrc) {
packet_queue_.RemovePacketsForSsrc(ssrc);
}

bool PacingController::IsProbing() const {
return prober_.is_probing();
}
Expand Down
3 changes: 3 additions & 0 deletions modules/pacing/pacing_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ class PacingController {
// is considered erroneous to exceed.
void SetCircuitBreakerThreshold(int num_iterations);

// Remove any pending packets matching this SSRC from the packet queue.
void RemovePacketsForSsrc(uint32_t ssrc);

private:
TimeDelta UpdateTimeAndGetElapsed(Timestamp now);
bool ShouldSendKeepalive(Timestamp now) const;
Expand Down
150 changes: 107 additions & 43 deletions modules/pacing/prioritized_packet_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ bool PrioritizedPacketQueue::StreamQueue::EnqueuePacket(QueuedPacket packet,
}

PrioritizedPacketQueue::QueuedPacket
PrioritizedPacketQueue::StreamQueue::DequePacket(int priority_level) {
PrioritizedPacketQueue::StreamQueue::DequeuePacket(int priority_level) {
RTC_DCHECK(!packets_[priority_level].empty());
QueuedPacket packet = std::move(packets_[priority_level].front());
packets_[priority_level].pop_front();
Expand Down Expand Up @@ -91,6 +91,16 @@ Timestamp PrioritizedPacketQueue::StreamQueue::LastEnqueueTime() const {
return last_enqueue_time_;
}

std::array<std::deque<PrioritizedPacketQueue::QueuedPacket>,
PrioritizedPacketQueue::kNumPriorityLevels>
PrioritizedPacketQueue::StreamQueue::DequeueAll() {
std::array<std::deque<QueuedPacket>, kNumPriorityLevels> packets_by_prio;
for (int i = 0; i < kNumPriorityLevels; ++i) {
packets_by_prio[i].swap(packets_[i]);
}
return packets_by_prio;
}

PrioritizedPacketQueue::PrioritizedPacketQueue(Timestamp creation_time)
: queue_time_sum_(TimeDelta::Zero()),
pause_time_sum_(TimeDelta::Zero()),
Expand Down Expand Up @@ -162,54 +172,16 @@ std::unique_ptr<RtpPacketToSend> PrioritizedPacketQueue::Pop() {

RTC_DCHECK_GE(top_active_prio_level_, 0);
StreamQueue& stream_queue = *streams_by_prio_[top_active_prio_level_].front();
QueuedPacket packet = stream_queue.DequePacket(top_active_prio_level_);
--size_packets_;
RTC_DCHECK(packet.packet->packet_type().has_value());
RtpPacketMediaType packet_type = packet.packet->packet_type().value();
--size_packets_per_media_type_[static_cast<size_t>(packet_type)];
RTC_DCHECK_GE(size_packets_per_media_type_[static_cast<size_t>(packet_type)],
0);
size_payload_ -= packet.PacketSize();

// Calculate the total amount of time spent by this packet in the queue
// while in a non-paused state. Note that the `pause_time_sum_ms_` was
// subtracted from `packet.enqueue_time_ms` when the packet was pushed, and
// by subtracting it now we effectively remove the time spent in in the
// queue while in a paused state.
TimeDelta time_in_non_paused_state =
last_update_time_ - packet.enqueue_time - pause_time_sum_;
queue_time_sum_ -= time_in_non_paused_state;

// Set the time spent in the send queue, which is the per-packet equivalent of
// totalPacketSendDelay. The notion of being paused is an implementation
// detail that we do not want to expose, so it makes sense to report the
// metric excluding the pause time. This also avoids spikes in the metric.
// https://w3c.github.io/webrtc-stats/#dom-rtcoutboundrtpstreamstats-totalpacketsenddelay
packet.packet->set_time_in_send_queue(time_in_non_paused_state);

RTC_DCHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());

RTC_CHECK(packet.enqueue_time_iterator != enqueue_times_.end());
enqueue_times_.erase(packet.enqueue_time_iterator);
QueuedPacket packet = stream_queue.DequeuePacket(top_active_prio_level_);
DequeuePacketInternal(packet);

// Remove StreamQueue from head of fifo-queue for this prio level, and
// and add it to the end if it still has packets.
streams_by_prio_[top_active_prio_level_].pop_front();
if (stream_queue.HasPacketsAtPrio(top_active_prio_level_)) {
streams_by_prio_[top_active_prio_level_].push_back(&stream_queue);
} else if (streams_by_prio_[top_active_prio_level_].empty()) {
// No stream queues have packets at this prio level, find top priority
// that is not empty.
if (size_packets_ == 0) {
top_active_prio_level_ = -1;
} else {
for (int i = 0; i < kNumPriorityLevels; ++i) {
if (!streams_by_prio_[i].empty()) {
top_active_prio_level_ = i;
break;
}
}
}
} else {
MaybeUpdateTopPrioLevel();
}

return std::move(packet.packet);
Expand Down Expand Up @@ -276,4 +248,96 @@ void PrioritizedPacketQueue::SetPauseState(bool paused, Timestamp now) {
paused_ = paused;
}

void PrioritizedPacketQueue::RemovePacketsForSsrc(uint32_t ssrc) {
auto kv = streams_.find(ssrc);
if (kv != streams_.end()) {
// Dequeue all packets from the queue for this SSRC.
StreamQueue& queue = *kv->second;
std::array<std::deque<QueuedPacket>, kNumPriorityLevels> packets_by_prio =
queue.DequeueAll();
for (int i = 0; i < kNumPriorityLevels; ++i) {
std::deque<QueuedPacket>& packet_queue = packets_by_prio[i];
if (packet_queue.empty()) {
continue;
}

// First erase all packets at this prio level.
while (!packet_queue.empty()) {
QueuedPacket packet = std::move(packet_queue.front());
packet_queue.pop_front();
DequeuePacketInternal(packet);
}

// Next, deregister this `StreamQueue` from the round-robin tables.
RTC_DCHECK(!streams_by_prio_[i].empty());
if (streams_by_prio_[i].size() == 1) {
// This is the last and only queue that had packets for this prio level.
// Update the global top prio level if neccessary.
RTC_DCHECK(streams_by_prio_[i].front() == &queue);
streams_by_prio_[i].pop_front();
if (i == top_active_prio_level_) {
MaybeUpdateTopPrioLevel();
}
} else {
// More than stream had packets at this prio level, filter this one out.
std::deque<StreamQueue*> filtered_queue;
for (StreamQueue* queue_ptr : streams_by_prio_[i]) {
if (queue_ptr != &queue) {
filtered_queue.push_back(queue_ptr);
}
}
streams_by_prio_[i].swap(filtered_queue);
}
}
}
}

void PrioritizedPacketQueue::DequeuePacketInternal(QueuedPacket& packet) {
--size_packets_;
RTC_DCHECK(packet.packet->packet_type().has_value());
RtpPacketMediaType packet_type = packet.packet->packet_type().value();
--size_packets_per_media_type_[static_cast<size_t>(packet_type)];
RTC_DCHECK_GE(size_packets_per_media_type_[static_cast<size_t>(packet_type)],
0);
size_payload_ -= packet.PacketSize();

// Calculate the total amount of time spent by this packet in the queue
// while in a non-paused state. Note that the `pause_time_sum_ms_` was
// subtracted from `packet.enqueue_time_ms` when the packet was pushed, and
// by subtracting it now we effectively remove the time spent in in the
// queue while in a paused state.
TimeDelta time_in_non_paused_state =
last_update_time_ - packet.enqueue_time - pause_time_sum_;
queue_time_sum_ -= time_in_non_paused_state;

// Set the time spent in the send queue, which is the per-packet equivalent of
// totalPacketSendDelay. The notion of being paused is an implementation
// detail that we do not want to expose, so it makes sense to report the
// metric excluding the pause time. This also avoids spikes in the metric.
// https://w3c.github.io/webrtc-stats/#dom-rtcoutboundrtpstreamstats-totalpacketsenddelay
packet.packet->set_time_in_send_queue(time_in_non_paused_state);

RTC_DCHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());

RTC_CHECK(packet.enqueue_time_iterator != enqueue_times_.end());
enqueue_times_.erase(packet.enqueue_time_iterator);
}

void PrioritizedPacketQueue::MaybeUpdateTopPrioLevel() {
if (streams_by_prio_[top_active_prio_level_].empty()) {
// No stream queues have packets at this prio level, find top priority
// that is not empty.
if (size_packets_ == 0) {
top_active_prio_level_ = -1;
} else {
for (int i = 0; i < kNumPriorityLevels; ++i) {
if (!streams_by_prio_[i].empty()) {
top_active_prio_level_ = i;
break;
}
}
}
}
}

} // namespace webrtc
16 changes: 15 additions & 1 deletion modules/pacing/prioritized_packet_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@

#include <stddef.h>

#include <array>
#include <deque>
#include <list>
#include <memory>
#include <unordered_map>
#include <vector>

#include "api/units/data_size.h"
#include "api/units/time_delta.h"
Expand Down Expand Up @@ -80,6 +82,9 @@ class PrioritizedPacketQueue {
// Set the pause state, while `paused` is true queuing time is not counted.
void SetPauseState(bool paused, Timestamp now);

// Remove any packets matching the given SSRC.
void RemovePacketsForSsrc(uint32_t ssrc);

private:
static constexpr int kNumPriorityLevels = 4;

Expand Down Expand Up @@ -107,18 +112,27 @@ class PrioritizedPacketQueue {
// count for that priority level went from zero to non-zero.
bool EnqueuePacket(QueuedPacket packet, int priority_level);

QueuedPacket DequePacket(int priority_level);
QueuedPacket DequeuePacket(int priority_level);

bool HasPacketsAtPrio(int priority_level) const;
bool IsEmpty() const;
Timestamp LeadingPacketEnqueueTime(int priority_level) const;
Timestamp LastEnqueueTime() const;

std::array<std::deque<QueuedPacket>, kNumPriorityLevels> DequeueAll();

private:
std::deque<QueuedPacket> packets_[kNumPriorityLevels];
Timestamp last_enqueue_time_;
};

// Remove the packet from the internal state, e.g. queue time / size etc.
void DequeuePacketInternal(QueuedPacket& packet);

// Check if the queue pointed to by `top_active_prio_level_` is empty and
// if so move it to the lowest non-empty index.
void MaybeUpdateTopPrioLevel();

// Cumulative sum, over all packets, of time spent in the queue.
TimeDelta queue_time_sum_;
// Cumulative sum of time the queue has spent in a paused state.
Expand Down
Loading

0 comments on commit 1b11b58

Please sign in to comment.