Skip to content

Commit

Permalink
Revert "Introduce grpc_byte_buffer_reader_peek and use it for Protobu…
Browse files Browse the repository at this point in the history
…f parsing."
  • Loading branch information
soheilhy authored Mar 5, 2019
1 parent 8b510d9 commit c060d55
Show file tree
Hide file tree
Showing 12 changed files with 11 additions and 194 deletions.
1 change: 0 additions & 1 deletion grpc.def
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ EXPORTS
grpc_byte_buffer_reader_init
grpc_byte_buffer_reader_destroy
grpc_byte_buffer_reader_next
grpc_byte_buffer_reader_peek
grpc_byte_buffer_reader_readall
grpc_raw_byte_buffer_from_reader
gpr_log_severity_string
Expand Down
13 changes: 0 additions & 13 deletions include/grpc/impl/codegen/byte_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,6 @@ GRPCAPI void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader* reader);
GRPCAPI int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
grpc_slice* slice);

/** EXPERIMENTAL API - This function may be removed and changed, in the future.
*
* Updates \a slice with the next piece of data from from \a reader and returns
* 1. Returns 0 at the end of the stream. Caller is responsible for making sure
* the slice pointer remains valid when accessed.
*
* NOTE: Do not use this function unless the caller can guarantee that the
* underlying grpc_byte_buffer outlasts the use of the slice. This is only
* safe when the underlying grpc_byte_buffer remains immutable while slice
* is being accessed. */
GRPCAPI int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader,
grpc_slice** slice);

/** Merge all data from \a reader into single slice */
GRPCAPI grpc_slice
grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader* reader);
Expand Down
2 changes: 0 additions & 2 deletions include/grpcpp/impl/codegen/core_codegen.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ class CoreCodegen final : public CoreCodegenInterface {
grpc_byte_buffer_reader* reader) override;
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
grpc_slice* slice) override;
int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader,
grpc_slice** slice) override;

grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice,
size_t nslices) override;
Expand Down
2 changes: 0 additions & 2 deletions include/grpcpp/impl/codegen/core_codegen_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ class CoreCodegenInterface {
grpc_byte_buffer_reader* reader) = 0;
virtual int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
grpc_slice* slice) = 0;
virtual int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader,
grpc_slice** slice) = 0;

virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice,
size_t nslices) = 0;
Expand Down
18 changes: 9 additions & 9 deletions include/grpcpp/impl/codegen/proto_buffer_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,23 @@ class ProtoBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream {
}
/// If we have backed up previously, we need to return the backed-up slice
if (backup_count_ > 0) {
*data = GRPC_SLICE_START_PTR(*slice_) + GRPC_SLICE_LENGTH(*slice_) -
*data = GRPC_SLICE_START_PTR(slice_) + GRPC_SLICE_LENGTH(slice_) -
backup_count_;
GPR_CODEGEN_ASSERT(backup_count_ <= INT_MAX);
*size = (int)backup_count_;
backup_count_ = 0;
return true;
}
/// Otherwise get the next slice from the byte buffer reader
if (!g_core_codegen_interface->grpc_byte_buffer_reader_peek(&reader_,
if (!g_core_codegen_interface->grpc_byte_buffer_reader_next(&reader_,
&slice_)) {
return false;
}
*data = GRPC_SLICE_START_PTR(*slice_);
g_core_codegen_interface->grpc_slice_unref(slice_);
*data = GRPC_SLICE_START_PTR(slice_);
// On win x64, int is only 32bit
GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(*slice_) <= INT_MAX);
byte_count_ += * size = (int)GRPC_SLICE_LENGTH(*slice_);
GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX);
byte_count_ += * size = (int)GRPC_SLICE_LENGTH(slice_);
return true;
}

Expand All @@ -99,7 +100,7 @@ class ProtoBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream {
/// bytes that have already been returned by the last call of Next.
/// So do the backup and have that ready for a later Next.
void BackUp(int count) override {
GPR_CODEGEN_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(*slice_)));
GPR_CODEGEN_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(slice_)));
backup_count_ = count;
}

Expand Down Expand Up @@ -134,15 +135,14 @@ class ProtoBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream {
int64_t backup_count() { return backup_count_; }
void set_backup_count(int64_t backup_count) { backup_count_ = backup_count; }
grpc_byte_buffer_reader* reader() { return &reader_; }
grpc_slice* slice() { return slice_; }
grpc_slice** mutable_slice_ptr() { return &slice_; }
grpc_slice* slice() { return &slice_; }

private:
int64_t byte_count_; ///< total bytes read since object creation
int64_t backup_count_; ///< how far backed up in the stream we are
grpc_byte_buffer_reader reader_; ///< internal object to read \a grpc_slice
///< from the \a grpc_byte_buffer
grpc_slice* slice_; ///< current slice passed back to the caller
grpc_slice slice_; ///< current slice passed back to the caller
Status status_; ///< status of the entire object
};

Expand Down
17 changes: 0 additions & 17 deletions src/core/lib/surface/byte_buffer_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,23 +91,6 @@ void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader* reader) {
}
}

int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader,
grpc_slice** slice) {
switch (reader->buffer_in->type) {
case GRPC_BB_RAW: {
grpc_slice_buffer* slice_buffer;
slice_buffer = &reader->buffer_out->data.raw.slice_buffer;
if (reader->current.index < slice_buffer->count) {
*slice = &slice_buffer->slices[reader->current.index];
reader->current.index += 1;
return 1;
}
break;
}
}
return 0;
}

int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
grpc_slice* slice) {
switch (reader->buffer_in->type) {
Expand Down
5 changes: 0 additions & 5 deletions src/cpp/common/core_codegen.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,6 @@ int CoreCodegen::grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
return ::grpc_byte_buffer_reader_next(reader, slice);
}

int CoreCodegen::grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader,
grpc_slice** slice) {
return ::grpc_byte_buffer_reader_peek(reader, slice);
}

grpc_byte_buffer* CoreCodegen::grpc_raw_byte_buffer_create(grpc_slice* slice,
size_t nslices) {
return ::grpc_raw_byte_buffer_create(slice, nslices);
Expand Down
2 changes: 0 additions & 2 deletions src/ruby/ext/grpc/rb_grpc_imports.generated.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ grpc_byte_buffer_destroy_type grpc_byte_buffer_destroy_import;
grpc_byte_buffer_reader_init_type grpc_byte_buffer_reader_init_import;
grpc_byte_buffer_reader_destroy_type grpc_byte_buffer_reader_destroy_import;
grpc_byte_buffer_reader_next_type grpc_byte_buffer_reader_next_import;
grpc_byte_buffer_reader_peek_type grpc_byte_buffer_reader_peek_import;
grpc_byte_buffer_reader_readall_type grpc_byte_buffer_reader_readall_import;
grpc_raw_byte_buffer_from_reader_type grpc_raw_byte_buffer_from_reader_import;
gpr_log_severity_string_type gpr_log_severity_string_import;
Expand Down Expand Up @@ -441,7 +440,6 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_byte_buffer_reader_init_import = (grpc_byte_buffer_reader_init_type) GetProcAddress(library, "grpc_byte_buffer_reader_init");
grpc_byte_buffer_reader_destroy_import = (grpc_byte_buffer_reader_destroy_type) GetProcAddress(library, "grpc_byte_buffer_reader_destroy");
grpc_byte_buffer_reader_next_import = (grpc_byte_buffer_reader_next_type) GetProcAddress(library, "grpc_byte_buffer_reader_next");
grpc_byte_buffer_reader_peek_import = (grpc_byte_buffer_reader_peek_type) GetProcAddress(library, "grpc_byte_buffer_reader_peek");
grpc_byte_buffer_reader_readall_import = (grpc_byte_buffer_reader_readall_type) GetProcAddress(library, "grpc_byte_buffer_reader_readall");
grpc_raw_byte_buffer_from_reader_import = (grpc_raw_byte_buffer_from_reader_type) GetProcAddress(library, "grpc_raw_byte_buffer_from_reader");
gpr_log_severity_string_import = (gpr_log_severity_string_type) GetProcAddress(library, "gpr_log_severity_string");
Expand Down
3 changes: 0 additions & 3 deletions src/ruby/ext/grpc/rb_grpc_imports.generated.h
Original file line number Diff line number Diff line change
Expand Up @@ -491,9 +491,6 @@ extern grpc_byte_buffer_reader_destroy_type grpc_byte_buffer_reader_destroy_impo
typedef int(*grpc_byte_buffer_reader_next_type)(grpc_byte_buffer_reader* reader, grpc_slice* slice);
extern grpc_byte_buffer_reader_next_type grpc_byte_buffer_reader_next_import;
#define grpc_byte_buffer_reader_next grpc_byte_buffer_reader_next_import
typedef int(*grpc_byte_buffer_reader_peek_type)(grpc_byte_buffer_reader* reader, grpc_slice** slice);
extern grpc_byte_buffer_reader_peek_type grpc_byte_buffer_reader_peek_import;
#define grpc_byte_buffer_reader_peek grpc_byte_buffer_reader_peek_import
typedef grpc_slice(*grpc_byte_buffer_reader_readall_type)(grpc_byte_buffer_reader* reader);
extern grpc_byte_buffer_reader_readall_type grpc_byte_buffer_reader_readall_import;
#define grpc_byte_buffer_reader_readall grpc_byte_buffer_reader_readall_import
Expand Down
70 changes: 0 additions & 70 deletions test/core/surface/byte_buffer_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,73 +101,6 @@ static void test_read_none_compressed_slice(void) {
grpc_byte_buffer_destroy(buffer);
}

static void test_peek_one_slice(void) {
grpc_slice slice;
grpc_byte_buffer* buffer;
grpc_byte_buffer_reader reader;
grpc_slice* first_slice;
grpc_slice* second_slice;
int first_code, second_code;

LOG_TEST("test_peek_one_slice");
slice = grpc_slice_from_copied_string("test");
buffer = grpc_raw_byte_buffer_create(&slice, 1);
grpc_slice_unref(slice);
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, buffer) &&
"Couldn't init byte buffer reader");
first_code = grpc_byte_buffer_reader_peek(&reader, &first_slice);
GPR_ASSERT(first_code != 0);
GPR_ASSERT(memcmp(GRPC_SLICE_START_PTR(*first_slice), "test", 4) == 0);
second_code = grpc_byte_buffer_reader_peek(&reader, &second_slice);
GPR_ASSERT(second_code == 0);
grpc_byte_buffer_destroy(buffer);
}

static void test_peek_one_slice_malloc(void) {
grpc_slice slice;
grpc_byte_buffer* buffer;
grpc_byte_buffer_reader reader;
grpc_slice* first_slice;
grpc_slice* second_slice;
int first_code, second_code;

LOG_TEST("test_peek_one_slice_malloc");
slice = grpc_slice_malloc(4);
memcpy(GRPC_SLICE_START_PTR(slice), "test", 4);
buffer = grpc_raw_byte_buffer_create(&slice, 1);
grpc_slice_unref(slice);
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, buffer) &&
"Couldn't init byte buffer reader");
first_code = grpc_byte_buffer_reader_peek(&reader, &first_slice);
GPR_ASSERT(first_code != 0);
GPR_ASSERT(memcmp(GRPC_SLICE_START_PTR(*first_slice), "test", 4) == 0);
second_code = grpc_byte_buffer_reader_peek(&reader, &second_slice);
GPR_ASSERT(second_code == 0);
grpc_byte_buffer_destroy(buffer);
}

static void test_peek_none_compressed_slice(void) {
grpc_slice slice;
grpc_byte_buffer* buffer;
grpc_byte_buffer_reader reader;
grpc_slice* first_slice;
grpc_slice* second_slice;
int first_code, second_code;

LOG_TEST("test_peek_none_compressed_slice");
slice = grpc_slice_from_copied_string("test");
buffer = grpc_raw_byte_buffer_create(&slice, 1);
grpc_slice_unref(slice);
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, buffer) &&
"Couldn't init byte buffer reader");
first_code = grpc_byte_buffer_reader_peek(&reader, &first_slice);
GPR_ASSERT(first_code != 0);
GPR_ASSERT(memcmp(GRPC_SLICE_START_PTR(*first_slice), "test", 4) == 0);
second_code = grpc_byte_buffer_reader_peek(&reader, &second_slice);
GPR_ASSERT(second_code == 0);
grpc_byte_buffer_destroy(buffer);
}

static void test_read_corrupted_slice(void) {
grpc_slice slice;
grpc_byte_buffer* buffer;
Expand Down Expand Up @@ -338,9 +271,6 @@ int main(int argc, char** argv) {
test_read_one_slice();
test_read_one_slice_malloc();
test_read_none_compressed_slice();
test_peek_one_slice();
test_peek_one_slice_malloc();
test_peek_none_compressed_slice();
test_read_gzip_compressed_slice();
test_read_deflate_compressed_slice();
test_read_corrupted_slice();
Expand Down
1 change: 0 additions & 1 deletion test/core/surface/public_headers_must_be_c89.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ int main(int argc, char **argv) {
printf("%lx", (unsigned long) grpc_byte_buffer_reader_init);
printf("%lx", (unsigned long) grpc_byte_buffer_reader_destroy);
printf("%lx", (unsigned long) grpc_byte_buffer_reader_next);
printf("%lx", (unsigned long) grpc_byte_buffer_reader_peek);
printf("%lx", (unsigned long) grpc_byte_buffer_reader_readall);
printf("%lx", (unsigned long) grpc_raw_byte_buffer_from_reader);
printf("%lx", (unsigned long) gpr_log_severity_string);
Expand Down
71 changes: 2 additions & 69 deletions test/cpp/microbenchmarks/bm_byte_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
namespace grpc {
namespace testing {

auto& force_library_initialization = Library::get();

static void BM_ByteBuffer_Copy(benchmark::State& state) {
Library::get();
int num_slices = state.range(0);
size_t slice_size = state.range(1);
std::vector<grpc::Slice> slices;
Expand All @@ -47,74 +48,6 @@ static void BM_ByteBuffer_Copy(benchmark::State& state) {
}
BENCHMARK(BM_ByteBuffer_Copy)->Ranges({{1, 64}, {1, 1024 * 1024}});

static void BM_ByteBufferReader_Next(benchmark::State& state) {
Library::get();
const int num_slices = state.range(0);
constexpr size_t kSliceSize = 16;
std::vector<grpc_slice> slices;
for (int i = 0; i < num_slices; ++i) {
std::unique_ptr<char[]> buf(new char[kSliceSize]);
slices.emplace_back(g_core_codegen_interface->grpc_slice_from_copied_buffer(
buf.get(), kSliceSize));
}
grpc_byte_buffer* bb = g_core_codegen_interface->grpc_raw_byte_buffer_create(
slices.data(), num_slices);
grpc_byte_buffer_reader reader;
GPR_ASSERT(
g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb));
while (state.KeepRunning()) {
grpc_slice* slice;
if (GPR_UNLIKELY(!g_core_codegen_interface->grpc_byte_buffer_reader_peek(
&reader, &slice))) {
g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader);
GPR_ASSERT(
g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb));
continue;
}
}

g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader);
g_core_codegen_interface->grpc_byte_buffer_destroy(bb);
for (auto& slice : slices) {
g_core_codegen_interface->grpc_slice_unref(slice);
}
}
BENCHMARK(BM_ByteBufferReader_Next)->Ranges({{64 * 1024, 1024 * 1024}});

static void BM_ByteBufferReader_Peek(benchmark::State& state) {
Library::get();
const int num_slices = state.range(0);
constexpr size_t kSliceSize = 16;
std::vector<grpc_slice> slices;
for (int i = 0; i < num_slices; ++i) {
std::unique_ptr<char[]> buf(new char[kSliceSize]);
slices.emplace_back(g_core_codegen_interface->grpc_slice_from_copied_buffer(
buf.get(), kSliceSize));
}
grpc_byte_buffer* bb = g_core_codegen_interface->grpc_raw_byte_buffer_create(
slices.data(), num_slices);
grpc_byte_buffer_reader reader;
GPR_ASSERT(
g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb));
while (state.KeepRunning()) {
grpc_slice* slice;
if (GPR_UNLIKELY(!g_core_codegen_interface->grpc_byte_buffer_reader_peek(
&reader, &slice))) {
g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader);
GPR_ASSERT(
g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb));
continue;
}
}

g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader);
g_core_codegen_interface->grpc_byte_buffer_destroy(bb);
for (auto& slice : slices) {
g_core_codegen_interface->grpc_slice_unref(slice);
}
}
BENCHMARK(BM_ByteBufferReader_Peek)->Ranges({{64 * 1024, 1024 * 1024}});

} // namespace testing
} // namespace grpc

Expand Down

0 comments on commit c060d55

Please sign in to comment.