Skip to content

Commit

Permalink
Be exception safe: storage rowset segment v2 (StarRocks#1022)
Browse files Browse the repository at this point in the history
make BE exception safe
modify and check the dir: storage/rowset/segment_v2
  • Loading branch information
qinpengxiang001 authored Nov 11, 2021
1 parent f43947b commit 94af746
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 40 deletions.
32 changes: 15 additions & 17 deletions be/src/storage/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,52 +373,50 @@ Status ColumnReader::bloom_filter(const std::vector<const vectorized::ColumnPred
Status ColumnReader::_load_ordinal_index(bool use_page_cache, bool kept_in_memory) {
Status st;
if (_flags[kHasOrdinalIndexMetaPos]) {
auto* index_meta = _ordinal_index.meta;
_ordinal_index.reader = new OrdinalIndexReader();
st = _ordinal_index.reader->load(_opts.block_mgr, _file_name, index_meta, _num_rows, use_page_cache,
kept_in_memory);
delete index_meta;
std::unique_ptr<OrdinalIndexPB> index_meta(_ordinal_index.meta);
_flags.set(kHasOrdinalIndexMetaPos, false);
_ordinal_index.reader = new OrdinalIndexReader();
_flags.set(kHasOrdinalIndexReaderPos, true);
st = _ordinal_index.reader->load(_opts.block_mgr, _file_name, index_meta.get(), _num_rows, use_page_cache,
kept_in_memory);
}
return st;
}

Status ColumnReader::_load_zone_map_index(bool use_page_cache, bool kept_in_memory) {
Status st;
if (_flags[kHasZoneMapIndexMetaPos]) {
auto* index_meta = _zone_map_index.meta;
_zone_map_index.reader = new ZoneMapIndexReader();
st = _zone_map_index.reader->load(_opts.block_mgr, _file_name, index_meta, use_page_cache, kept_in_memory);
delete index_meta;
std::unique_ptr<ZoneMapIndexPB> index_meta(_zone_map_index.meta);
_flags.set(kHasZoneMapIndexMetaPos, false);
_zone_map_index.reader = new ZoneMapIndexReader();
_flags.set(kHasZoneMapIndexReaderPos, true);
st = _zone_map_index.reader->load(_opts.block_mgr, _file_name, index_meta.get(), use_page_cache,
kept_in_memory);
}
return st;
}

Status ColumnReader::_load_bitmap_index(bool use_page_cache, bool kept_in_memory) {
Status st;
if (_flags[kHasBitmapIndexMetaPos]) {
auto* index_meta = _bitmap_index.meta;
_bitmap_index.reader = new BitmapIndexReader();
st = _bitmap_index.reader->load(_opts.block_mgr, _file_name, index_meta, use_page_cache, kept_in_memory);
delete index_meta;
std::unique_ptr<BitmapIndexPB> index_meta(_bitmap_index.meta);
_flags.set(kHasBitmapIndexMetaPos, false);
_bitmap_index.reader = new BitmapIndexReader();
_flags.set(kHasBitmapIndexReaderPos, true);
st = _bitmap_index.reader->load(_opts.block_mgr, _file_name, index_meta.get(), use_page_cache, kept_in_memory);
}
return st;
}

Status ColumnReader::_load_bloom_filter_index(bool use_page_cache, bool kept_in_memory) {
Status st;
if (_flags[kHasBloomFilterIndexMetaPos]) {
auto* index_meta = _bloom_filter_index.meta;
_bloom_filter_index.reader = new BloomFilterIndexReader();
st = _bloom_filter_index.reader->load(_opts.block_mgr, _file_name, index_meta, use_page_cache, kept_in_memory);
delete index_meta;
std::unique_ptr<BloomFilterIndexPB> index_meta(_bloom_filter_index.meta);
_flags.set(kHasBloomFilterIndexMetaPos, false);
_bloom_filter_index.reader = new BloomFilterIndexReader();
_flags.set(kHasBloomFilterIndexReaderPos, true);
st = _bloom_filter_index.reader->load(_opts.block_mgr, _file_name, index_meta.get(), use_page_cache,
kept_in_memory);
}
return st;
}
Expand Down
29 changes: 12 additions & 17 deletions be/src/storage/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
std::unique_ptr<ColumnWriter> element_writer;
RETURN_IF_ERROR(ColumnWriter::create(element_options, &element_column, _wblock, &element_writer));

ScalarColumnWriter* null_writer = nullptr;
std::unique_ptr<ScalarColumnWriter> null_writer = nullptr;
if (opts.meta->is_nullable()) {
ColumnWriterOptions null_options;
null_options.meta = opts.meta->add_children_columns();
Expand All @@ -307,7 +307,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
null_options.meta->set_compression(LZ4);
null_options.meta->set_is_nullable(false);
std::unique_ptr<Field> bool_field(FieldFactory::create_by_type(FieldType::OLAP_FIELD_TYPE_BOOL));
null_writer = new ScalarColumnWriter(null_options, std::move(bool_field), _wblock);
null_writer = std::make_unique<ScalarColumnWriter>(null_options, std::move(bool_field), _wblock);
}

ColumnWriterOptions array_size_options;
Expand All @@ -323,12 +323,10 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
array_size_options.need_bloom_filter = false;
array_size_options.need_bitmap_index = false;
std::unique_ptr<Field> bigint_field(FieldFactory::create_by_type(FieldType::OLAP_FIELD_TYPE_INT));
ScalarColumnWriter* offset_writer =
new ScalarColumnWriter(array_size_options, std::move(bigint_field), _wblock);

std::unique_ptr<ColumnWriter> writer_local = std::unique_ptr<ColumnWriter>(new ArrayColumnWriter(
opts, std::move(field), null_writer, offset_writer, std::move(element_writer)));
*writer = std::move(writer_local);
std::unique_ptr<ScalarColumnWriter> offset_writer =
std::make_unique<ScalarColumnWriter>(array_size_options, std::move(bigint_field), _wblock);
*writer = std::make_unique<ArrayColumnWriter>(opts, std::move(field), std::move(null_writer),
std::move(offset_writer), std::move(element_writer));
return Status::OK();
}
default:
Expand Down Expand Up @@ -757,16 +755,13 @@ Status ScalarColumnWriter::append(const uint8_t* data, const uint8_t* null_flags
////////////////////////////////////////////////////////////////////////////////

ArrayColumnWriter::ArrayColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
ScalarColumnWriter* null_writer, ScalarColumnWriter* offset_writer,
std::unique_ptr<ScalarColumnWriter> null_writer,
std::unique_ptr<ScalarColumnWriter> offset_writer,
std::unique_ptr<ColumnWriter> element_writer)
: ColumnWriter(std::move(field), opts.meta->is_nullable()), _element_writer(std::move(element_writer)) {
if (is_nullable()) {
_null_writer.reset(null_writer);
} else {
_null_writer = nullptr;
}
_array_size_writer.reset(offset_writer);
}
: ColumnWriter(std::move(field), opts.meta->is_nullable()),
_null_writer(std::move(null_writer)),
_array_size_writer(std::move(offset_writer)),
_element_writer(std::move(element_writer)) {}

Status ArrayColumnWriter::init() {
if (is_nullable()) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/storage/rowset/segment_v2/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ class ScalarColumnWriter final : public ColumnWriter {
class ArrayColumnWriter final : public ColumnWriter {
public:
explicit ArrayColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
ScalarColumnWriter* null_writer, ScalarColumnWriter* offset_writer,
std::unique_ptr<ScalarColumnWriter> null_writer,
std::unique_ptr<ScalarColumnWriter> offset_writer,
std::unique_ptr<ColumnWriter> element_writer);
~ArrayColumnWriter() override = default;

Expand Down
5 changes: 3 additions & 2 deletions be/src/storage/rowset/segment_v2/page_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "util/crc32c.h"
#include "util/faststring.h"
#include "util/runtime_profile.h"
#include "util/scoped_cleanup.h"

namespace starrocks::segment_v2 {

Expand All @@ -43,6 +44,7 @@ using strings::Substitute;
Status PageIO::compress_page_body(const BlockCompressionCodec* codec, double min_space_saving,
const std::vector<Slice>& body, faststring* compressed_body) {
size_t uncompressed_size = Slice::compute_total_size(body);
auto cleanup = MakeScopedCleanup([&]() { compressed_body->clear(); });
if (codec != nullptr && codec->exceed_max_input_size(uncompressed_size)) {
compressed_body->clear();
return Status::OK();
Expand All @@ -57,11 +59,10 @@ Status PageIO::compress_page_body(const BlockCompressionCodec* codec, double min
// return compressed body only when it saves more than min_space_saving
if (space_saving > 0 && space_saving >= min_space_saving) {
compressed_body->shrink_to_fit();
cleanup.cancel();
return Status::OK();
}
}
// otherwise, do not compress
compressed_body->clear();
return Status::OK();
}

Expand Down
6 changes: 3 additions & 3 deletions be/src/storage/rowset/segment_v2/rle_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ class RlePageBuilder final : public PageBuilder {
_first_value(0),
_last_value(0) {
_bit_width = (Type == OLAP_FIELD_TYPE_BOOL) ? 1 : SIZE_OF_TYPE * 8;
_rle_encoder = new RleEncoder<CppType>(&_buf, _bit_width);
_rle_encoder = std::make_unique<RleEncoder<CppType>>(&_buf, _bit_width);
reset();
}

~RlePageBuilder() override { delete _rle_encoder; }
~RlePageBuilder() override {}

bool is_page_full() override { return _rle_encoder->len() >= _options.data_page_size; }

Expand Down Expand Up @@ -140,7 +140,7 @@ class RlePageBuilder final : public PageBuilder {
size_t _count;
bool _finished;
int _bit_width;
RleEncoder<CppType>* _rle_encoder;
std::unique_ptr<RleEncoder<CppType>> _rle_encoder;
faststring _buf;
CppType _first_value;
CppType _last_value;
Expand Down

0 comments on commit 94af746

Please sign in to comment.