Skip to content

Commit

Permalink
dcsctp: Only reset paused streams when peer acks
Browse files Browse the repository at this point in the history
When a single stream is reset, and an outgoing SSN reset request is sent
and later acked by the peer sending a reconfiguration response with
status=Performed, the sender should unpause the paused stream and reset
the SSNs of that (ordered) stream. But only the single stream that was
paused, and not all streams. In this scenario, dcSCTP would - when the
peer acked the SSN reset request - reset the SSN of all streams.

This was found by [email protected] using a data channel test
application. The peer, if it's a usrsctp client, will ABORT with
PROTOCOL_VIOLATION as it has already seen that SSN on that stream but
with a different TSN.

This bug was introduced when implementing the Round Robin scheduler in
https://webrtc-review.googlesource.com/c/src/+/219682. The FCFS
scheduler prior to this change was implemented correctly.

Bug: webrtc:12952
Change-Id: I3ea144a1df303145f69a5b03aada7f448c8c8163
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/225266
Reviewed-by: Florent Castelli <[email protected]>
Commit-Queue: Victor Boivie <[email protected]>
Cr-Commit-Position: refs/heads/master@{#34436}
  • Loading branch information
Victor Boivie authored and WebRTC LUCI CQ committed Jul 8, 2021
1 parent 706ef1b commit 8bd26e1
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 1 deletion.
105 changes: 105 additions & 0 deletions net/dcsctp/socket/dcsctp_socket_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,33 @@ constexpr SendOptions kSendOptions;
constexpr size_t kLargeMessageSize = DcSctpOptions::kMaxSafeMTUSize * 20;
static constexpr size_t kSmallMessageSize = 10;

MATCHER_P(HasDataChunkWithStreamId, stream_id, "") {
absl::optional<SctpPacket> packet = SctpPacket::Parse(arg);
if (!packet.has_value()) {
*result_listener << "data didn't parse as an SctpPacket";
return false;
}

if (packet->descriptors()[0].type != DataChunk::kType) {
*result_listener << "the first chunk in the packet is not a data chunk";
return false;
}

absl::optional<DataChunk> dc =
DataChunk::Parse(packet->descriptors()[0].data);
if (!dc.has_value()) {
*result_listener << "The first chunk didn't parse as a data chunk";
return false;
}

if (dc->stream_id() != stream_id) {
*result_listener << "the stream_id is " << *dc->stream_id();
return false;
}

return true;
}

MATCHER_P(HasDataChunkWithSsn, ssn, "") {
absl::optional<SctpPacket> packet = SctpPacket::Parse(arg);
if (!packet.has_value()) {
Expand Down Expand Up @@ -888,6 +915,84 @@ TEST_F(DcSctpSocketTest, ResetStreamWillMakeChunksStartAtZeroSsn) {
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
}

TEST_F(DcSctpSocketTest, ResetStreamWillOnlyResetTheRequestedStreams) {
ConnectSockets();

std::vector<uint8_t> payload(options_.mtu - 100);

// Send two ordered messages on SID 1
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), {});
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), {});

auto packet1 = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet1, HasDataChunkWithStreamId(StreamID(1)));
EXPECT_THAT(packet1, HasDataChunkWithSsn(SSN(0)));
sock_z_.ReceivePacket(packet1);

auto packet2 = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet1, HasDataChunkWithStreamId(StreamID(1)));
EXPECT_THAT(packet2, HasDataChunkWithSsn(SSN(1)));
sock_z_.ReceivePacket(packet2);

// Handle SACK
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());

// Do the same, for SID 3
sock_a_.Send(DcSctpMessage(StreamID(3), PPID(53), payload), {});
sock_a_.Send(DcSctpMessage(StreamID(3), PPID(53), payload), {});
auto packet3 = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet3, HasDataChunkWithStreamId(StreamID(3)));
EXPECT_THAT(packet3, HasDataChunkWithSsn(SSN(0)));
sock_z_.ReceivePacket(packet3);
auto packet4 = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet4, HasDataChunkWithStreamId(StreamID(3)));
EXPECT_THAT(packet4, HasDataChunkWithSsn(SSN(1)));
sock_z_.ReceivePacket(packet4);
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());

// Receive all messages.
absl::optional<DcSctpMessage> msg1 = cb_z_.ConsumeReceivedMessage();
ASSERT_TRUE(msg1.has_value());
EXPECT_EQ(msg1->stream_id(), StreamID(1));

absl::optional<DcSctpMessage> msg2 = cb_z_.ConsumeReceivedMessage();
ASSERT_TRUE(msg2.has_value());
EXPECT_EQ(msg2->stream_id(), StreamID(1));

absl::optional<DcSctpMessage> msg3 = cb_z_.ConsumeReceivedMessage();
ASSERT_TRUE(msg3.has_value());
EXPECT_EQ(msg3->stream_id(), StreamID(3));

absl::optional<DcSctpMessage> msg4 = cb_z_.ConsumeReceivedMessage();
ASSERT_TRUE(msg4.has_value());
EXPECT_EQ(msg4->stream_id(), StreamID(3));

// Reset SID 1. This will directly send a RE-CONFIG.
sock_a_.ResetStreams(std::vector<StreamID>({StreamID(3)}));
// RE-CONFIG, req
sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket());
// RE-CONFIG, resp
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());

// Send a message on SID 1 and 3 - SID 1 should not be reset, but 3 should.
sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), {});

sock_a_.Send(DcSctpMessage(StreamID(3), PPID(53), payload), {});

auto packet5 = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet5, HasDataChunkWithStreamId(StreamID(1)));
EXPECT_THAT(packet5, HasDataChunkWithSsn(SSN(2))); // Unchanged.
sock_z_.ReceivePacket(packet5);

auto packet6 = cb_a_.ConsumeSentPacket();
EXPECT_THAT(packet6, HasDataChunkWithStreamId(StreamID(3)));
EXPECT_THAT(packet6, HasDataChunkWithSsn(SSN(0))); // Reset.
sock_z_.ReceivePacket(packet6);

// Handle SACK
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
}

TEST_F(DcSctpSocketTest, OnePeerReconnects) {
ConnectSockets();

Expand Down
6 changes: 5 additions & 1 deletion net/dcsctp/tx/rr_send_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,11 @@ bool RRSendQueue::CanResetStreams() const {
}

void RRSendQueue::CommitResetStreams() {
Reset();
for (auto& stream_entry : streams_) {
if (stream_entry.second.is_paused()) {
stream_entry.second.Reset();
}
}
RTC_DCHECK(IsConsistent());
}

Expand Down
41 changes: 41 additions & 0 deletions net/dcsctp/tx/rr_send_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,47 @@ TEST_F(RRSendQueueTest, CommittingResetsSSN) {
EXPECT_EQ(chunk_three->data.ssn, SSN(0));
}

TEST_F(RRSendQueueTest, CommittingResetsSSNForPausedStreamsOnly) {
std::vector<uint8_t> payload(50);

buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload));

absl::optional<SendQueue::DataToSend> chunk_one =
buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_one.has_value());
EXPECT_EQ(chunk_one->data.stream_id, StreamID(1));
EXPECT_EQ(chunk_one->data.ssn, SSN(0));

absl::optional<SendQueue::DataToSend> chunk_two =
buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_two.has_value());
EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
EXPECT_EQ(chunk_two->data.ssn, SSN(0));

StreamID stream_ids[] = {StreamID(3)};
buf_.PrepareResetStreams(stream_ids);

// Send two more messages - SID 3 will buffer, SID 1 will send.
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload));

EXPECT_TRUE(buf_.CanResetStreams());
buf_.CommitResetStreams();

absl::optional<SendQueue::DataToSend> chunk_three =
buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_three.has_value());
EXPECT_EQ(chunk_three->data.stream_id, StreamID(1));
EXPECT_EQ(chunk_three->data.ssn, SSN(1));

absl::optional<SendQueue::DataToSend> chunk_four =
buf_.Produce(kNow, kOneFragmentPacketSize);
ASSERT_TRUE(chunk_four.has_value());
EXPECT_EQ(chunk_four->data.stream_id, StreamID(3));
EXPECT_EQ(chunk_four->data.ssn, SSN(0));
}

TEST_F(RRSendQueueTest, RollBackResumesSSN) {
std::vector<uint8_t> payload(50);

Expand Down

0 comments on commit 8bd26e1

Please sign in to comment.