Skip to content

Commit

Permalink
Fix fuzzer bug b/309716763 (grpc#35278)
Browse files Browse the repository at this point in the history
Also cleanup a little so we're not copying redundant frame headers everywhere.

Closes grpc#35278

COPYBARA_INTEGRATE_REVIEW=grpc#35278 from ctiller:fuzz-309716763 52589ff
PiperOrigin-RevId: 590042072
  • Loading branch information
ctiller authored and copybara-github committed Dec 12, 2023
1 parent ff63ad9 commit 9118cb1
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 73 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6214,6 +6214,7 @@ grpc_cc_library(
],
deps = [
"bitset",
"//:gpr",
"//:gpr_platform",
],
)
Expand Down
12 changes: 5 additions & 7 deletions src/core/ext/transport/chaotic_good/client_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "src/core/ext/transport/chaotic_good/client_transport.h"

#include <cstdint>
#include <memory>
#include <string>
#include <tuple>
Expand Down Expand Up @@ -77,8 +78,7 @@ ClientTransport::ClientTransport(
control_endpoint_write_buffer_.Append(
frame->Serialize(hpack_compressor_.get()));
if (frame->message != nullptr) {
std::string message_padding(
frame->frame_header.message_padding, '0');
std::string message_padding(frame->message_padding, '0');
Slice slice(grpc_slice_from_cpp_string(message_padding));
// Append message padding to data_endpoint_buffer.
data_endpoint_write_buffer_.Append(std::move(slice));
Expand Down Expand Up @@ -157,11 +157,9 @@ ClientTransport::ClientTransport(
// Move message into frame.
frame.message = arena_->MakePooled<Message>(
std::move(data_endpoint_read_buffer_), 0);
auto stream_id = frame.frame_header.stream_id;
{
MutexLock lock(&mu_);
return stream_map_[stream_id]->Push(ServerFrame(std::move(frame)));
}
MutexLock lock(&mu_);
const uint32_t stream_id = frame_header_->stream_id;
return stream_map_[stream_id]->Push(ServerFrame(std::move(frame)));
},
// Check if send frame to corresponding stream successfully.
[](bool ret) -> LoopCtl<absl::Status> {
Expand Down
65 changes: 31 additions & 34 deletions src/core/ext/transport/chaotic_good/client_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,40 +107,37 @@ class ClientTransport {
return TrySeq(
TryJoin(
// Continuously send client frame with client to server messages.
ForEach(
std::move(*call_args.client_to_server_messages),
[stream_id, initial_frame = true,
client_initial_metadata =
std::move(call_args.client_initial_metadata),
outgoing_frames = outgoing_frames_.MakeSender(),
this](MessageHandle result) mutable {
ClientFragmentFrame frame;
// Construct frame header (flags, header_length and
// trailer_length will be added in serialization).
uint32_t message_length = result->payload()->Length();
uint32_t message_padding = message_length % aligned_bytes;
frame.frame_header = FrameHeader{
FrameType::kFragment, {}, stream_id, 0, message_length,
message_padding, 0};
frame.message = std::move(result);
if (initial_frame) {
// Send initial frame with client intial metadata.
frame.headers = std::move(client_initial_metadata);
initial_frame = false;
}
return TrySeq(
outgoing_frames.Send(ClientFrame(std::move(frame))),
[](bool success) -> absl::Status {
if (!success) {
// TODO(ladynana): propagate the actual error message
// from EventEngine.
return absl::UnavailableError(
"Transport closed due to endpoint write/read "
"failed.");
}
return absl::OkStatus();
});
}),
ForEach(std::move(*call_args.client_to_server_messages),
[stream_id, initial_frame = true,
client_initial_metadata =
std::move(call_args.client_initial_metadata),
outgoing_frames = outgoing_frames_.MakeSender(),
this](MessageHandle result) mutable {
ClientFragmentFrame frame;
// Construct frame header (flags, header_length and
// trailer_length will be added in serialization).
uint32_t message_length = result->payload()->Length();
frame.stream_id = stream_id;
frame.message_padding = message_length % aligned_bytes;
frame.message = std::move(result);
if (initial_frame) {
// Send initial frame with client intial metadata.
frame.headers = std::move(client_initial_metadata);
initial_frame = false;
}
return TrySeq(
outgoing_frames.Send(ClientFrame(std::move(frame))),
[](bool success) -> absl::Status {
if (!success) {
// TODO(ladynana): propagate the actual error
// message from EventEngine.
return absl::UnavailableError(
"Transport closed due to endpoint write/read "
"failed.");
}
return absl::OkStatus();
});
}),
// Continuously receive server frames from endpoints and save
// results to call_args.
Loop([server_initial_metadata = call_args.server_initial_metadata,
Expand Down
65 changes: 51 additions & 14 deletions src/core/ext/transport/chaotic_good/frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ const NoDestruct<Slice> kZeroSlice{[] {

class FrameSerializer {
public:
explicit FrameSerializer(FrameHeader header) : header_(header) {
explicit FrameSerializer(FrameType frame_type, uint32_t stream_id,
uint32_t message_padding) {
output_.AppendIndexed(kZeroSlice->Copy());
// Initialize header flags, header_length, trailer_length to 0.
header_.type = frame_type;
header_.stream_id = stream_id;
header_.message_padding = message_padding;
header_.flags.SetAll(false);
header_.header_length = 0;
header_.trailer_length = 0;
}
// If called, must be called before AddTrailers, Finish.
SliceBuffer& AddHeaders() {
Expand Down Expand Up @@ -173,19 +174,21 @@ absl::Status SettingsFrame::Deserialize(HPackParser*, const FrameHeader& header,
}

SliceBuffer SettingsFrame::Serialize(HPackCompressor*) const {
FrameSerializer serializer(
FrameHeader{FrameType::kSettings, {}, 0, 0, 0, 0, 0});
FrameSerializer serializer(FrameType::kSettings, 0, 0);
return serializer.Finish();
}

std::string SettingsFrame::ToString() const { return "SettingsFrame{}"; }

absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser,
const FrameHeader& header,
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) {
if (header.stream_id == 0) {
return absl::InvalidArgumentError("Expected non-zero stream id");
}
frame_header = header;
stream_id = header.stream_id;
message_padding = header.message_padding;
if (header.type != FrameType::kFragment) {
return absl::InvalidArgumentError("Expected fragment frame");
}
Expand All @@ -197,6 +200,9 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser,
if (r.value() != nullptr) {
headers = std::move(r.value());
}
} else if (header.header_length != 0) {
return absl::InvalidArgumentError(absl::StrCat(
"Unexpected non-zero header length", header.header_length));
}
if (header.flags.is_set(1)) {
if (header.trailer_length != 0) {
Expand All @@ -210,8 +216,8 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser,
}

SliceBuffer ClientFragmentFrame::Serialize(HPackCompressor* encoder) const {
GPR_ASSERT(frame_header.stream_id != 0);
FrameSerializer serializer(frame_header);
GPR_ASSERT(stream_id != 0);
FrameSerializer serializer(FrameType::kFragment, stream_id, message_padding);
if (headers.get() != nullptr) {
encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders());
}
Expand All @@ -221,14 +227,25 @@ SliceBuffer ClientFragmentFrame::Serialize(HPackCompressor* encoder) const {
return serializer.Finish();
}

std::string ClientFragmentFrame::ToString() const {
return absl::StrCat(
"ClientFragmentFrame{stream_id=", stream_id, ", headers=",
headers.get() != nullptr ? headers->DebugString().c_str() : "nullptr",
", message=",
message.get() != nullptr ? message->DebugString().c_str() : "nullptr",
", message_padding=", message_padding, ", end_of_stream=", end_of_stream,
"}");
}

absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser,
const FrameHeader& header,
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) {
if (header.stream_id == 0) {
return absl::InvalidArgumentError("Expected non-zero stream id");
}
frame_header = header;
stream_id = header.stream_id;
message_padding = header.message_padding;
FrameDeserializer deserializer(header, slice_buffer);
if (header.flags.is_set(0)) {
auto r =
Expand All @@ -238,6 +255,9 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser,
if (r.value() != nullptr) {
headers = std::move(r.value());
}
} else if (header.header_length != 0) {
return absl::InvalidArgumentError(absl::StrCat(
"Unexpected non-zero header length", header.header_length));
}
if (header.flags.is_set(1)) {
auto r =
Expand All @@ -247,13 +267,16 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser,
if (r.value() != nullptr) {
trailers = std::move(r.value());
}
} else if (header.trailer_length != 0) {
return absl::InvalidArgumentError(absl::StrCat(
"Unexpected non-zero trailer length", header.trailer_length));
}
return deserializer.Finish();
}

SliceBuffer ServerFragmentFrame::Serialize(HPackCompressor* encoder) const {
GPR_ASSERT(frame_header.stream_id != 0);
FrameSerializer serializer(frame_header);
GPR_ASSERT(stream_id != 0);
FrameSerializer serializer(FrameType::kFragment, stream_id, message_padding);
if (headers.get() != nullptr) {
encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders());
}
Expand All @@ -263,6 +286,17 @@ SliceBuffer ServerFragmentFrame::Serialize(HPackCompressor* encoder) const {
return serializer.Finish();
}

std::string ServerFragmentFrame::ToString() const {
return absl::StrCat(
"ServerFragmentFrame{stream_id=", stream_id, ", headers=",
headers.get() != nullptr ? headers->DebugString().c_str() : "nullptr",
", message=",
message.get() != nullptr ? message->DebugString().c_str() : "nullptr",
", message_padding=", message_padding, ", trailers=",
trailers.get() != nullptr ? trailers->DebugString().c_str() : "nullptr",
"}");
}

absl::Status CancelFrame::Deserialize(HPackParser*, const FrameHeader& header,
absl::BitGenRef,
SliceBuffer& slice_buffer) {
Expand All @@ -282,10 +316,13 @@ absl::Status CancelFrame::Deserialize(HPackParser*, const FrameHeader& header,

SliceBuffer CancelFrame::Serialize(HPackCompressor*) const {
GPR_ASSERT(stream_id != 0);
FrameSerializer serializer(
FrameHeader{FrameType::kCancel, {}, stream_id, 0, 0, 0, 0});
FrameSerializer serializer(FrameType::kCancel, stream_id, 0);
return serializer.Finish();
}

std::string CancelFrame::ToString() const {
return absl::StrCat("CancelFrame{stream_id=", stream_id, "}");
}

} // namespace chaotic_good
} // namespace grpc_core
18 changes: 12 additions & 6 deletions src/core/ext/transport/chaotic_good/frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class FrameInterface {
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) = 0;
virtual SliceBuffer Serialize(HPackCompressor* encoder) const = 0;
virtual std::string ToString() const = 0;

protected:
static bool EqVal(const Message& a, const Message& b) {
Expand All @@ -67,6 +68,7 @@ struct SettingsFrame final : public FrameInterface {
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) override;
SliceBuffer Serialize(HPackCompressor* encoder) const override;
std::string ToString() const override;

bool operator==(const SettingsFrame&) const { return true; }
};
Expand All @@ -76,15 +78,16 @@ struct ClientFragmentFrame final : public FrameInterface {
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) override;
SliceBuffer Serialize(HPackCompressor* encoder) const override;
std::string ToString() const override;

FrameHeader frame_header;
uint32_t stream_id;
ClientMetadataHandle headers;
MessageHandle message;
uint32_t message_padding;
bool end_of_stream = false;

bool operator==(const ClientFragmentFrame& other) const {
return frame_header.stream_id == other.frame_header.stream_id &&
EqHdl(headers, other.headers) &&
return stream_id == other.stream_id && EqHdl(headers, other.headers) &&
end_of_stream == other.end_of_stream;
}
};
Expand All @@ -94,15 +97,17 @@ struct ServerFragmentFrame final : public FrameInterface {
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) override;
SliceBuffer Serialize(HPackCompressor* encoder) const override;
std::string ToString() const override;

FrameHeader frame_header;
uint32_t stream_id;
ServerMetadataHandle headers;
MessageHandle message;
uint32_t message_padding;
ServerMetadataHandle trailers;

bool operator==(const ServerFragmentFrame& other) const {
return frame_header.stream_id == other.frame_header.stream_id &&
EqHdl(headers, other.headers) && EqHdl(trailers, other.trailers);
return stream_id == other.stream_id && EqHdl(headers, other.headers) &&
EqHdl(trailers, other.trailers);
}
};

Expand All @@ -111,6 +116,7 @@ struct CancelFrame final : public FrameInterface {
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) override;
SliceBuffer Serialize(HPackCompressor* encoder) const override;
std::string ToString() const override;

uint32_t stream_id;

Expand Down
Loading

0 comments on commit 9118cb1

Please sign in to comment.