Skip to content

Commit

Permalink
Make some parts of storage module exception safe (StarRocks#1153)
Browse files Browse the repository at this point in the history
  • Loading branch information
qinpengxiang001 authored Nov 12, 2021
1 parent 5b48617 commit 1d3621f
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 68 deletions.
9 changes: 6 additions & 3 deletions be/src/storage/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "storage/storage_engine.h"
#include "storage/tablet_meta_manager.h"
#include "storage/utils.h" // for check_dir_existed
#include "util/defer_op.h"
#include "util/errno.h"
#include "util/file_utils.h"
#include "util/monotime.h"
Expand Down Expand Up @@ -127,18 +128,20 @@ Status DataDir::_init_cluster_id() {
"open file failed");
}

int lock_res = flock(fp->_fileno, LOCK_EX | LOCK_NB);
if (lock_res < 0) {
DeferOp close_fp([&]() {
fclose(fp);
fp = nullptr;
});

int lock_res = flock(fp->_fileno, LOCK_EX | LOCK_NB);
if (lock_res < 0) {
RETURN_IF_ERROR_WITH_WARN(
Status::IOError(strings::Substitute("failed to flock cluster id file $0", cluster_id_path)),
"flock file failed");
}

// obtain cluster id of all root paths
auto st = _read_cluster_id(cluster_id_path, &_cluster_id);
fclose(fp);
return st;
}

Expand Down
10 changes: 10 additions & 0 deletions be/src/storage/delete_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "storage/olap_common.h"
#include "storage/olap_cond.h"
#include "storage/utils.h"
#include "util/scoped_cleanup.h"

using apache::thrift::ThriftDebugString;
using std::numeric_limits;
Expand All @@ -50,6 +51,13 @@ using google::protobuf::RepeatedPtrField;

namespace starrocks {

DeleteHandler::~DeleteHandler() {
for (auto& _del_cond : _del_conds) {
delete _del_cond.del_cond;
}
_del_conds.clear();
}

OLAPStatus DeleteConditionHandler::generate_delete_predicate(const TabletSchema& schema,
const std::vector<TCondition>& conditions,
DeletePredicatePB* del_pred) {
Expand Down Expand Up @@ -267,6 +275,7 @@ OLAPStatus DeleteHandler::init(const TabletSchema& schema, const DelPredicateArr
temp.filter_version = it->version();

temp.del_cond = new (std::nothrow) Conditions();
ScopedCleanup del_cond_delete_guard([&]() { delete temp.del_cond; });

if (temp.del_cond == nullptr) {
LOG(FATAL) << "fail to malloc Conditions. size=" << sizeof(Conditions);
Expand Down Expand Up @@ -309,6 +318,7 @@ OLAPStatus DeleteHandler::init(const TabletSchema& schema, const DelPredicateArr
}

_del_conds.push_back(temp);
del_cond_delete_guard.cancel();
}

_is_inited = true;
Expand Down
3 changes: 1 addition & 2 deletions be/src/storage/delete_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ class DeleteConditionHandler {
struct DeleteConditions {
DeleteConditions() {}
~DeleteConditions() = default;

int32_t filter_version{0}; // delete condition version
Conditions* del_cond{nullptr}; // delete condition
};
Expand All @@ -93,7 +92,7 @@ class DeleteHandler {
typedef std::vector<DeleteConditions>::size_type cond_num_t;

DeleteHandler() {}
~DeleteHandler() = default;
~DeleteHandler();

// Use regular expression to extract 'column_name', 'op' and 'operands'
static bool parse_condition(const std::string& condition_str, TCondition* condition);
Expand Down
40 changes: 20 additions & 20 deletions be/src/storage/field.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ class Field {
virtual size_t get_variable_len() const { return 0; }

virtual Field* clone() const {
auto* local = new Field();
this->clone(local);
return local;
std::unique_ptr<Field> local = std::make_unique<Field>();
this->clone(local.get());
return local.release();
}

// Test if these two cell is equal with each other
Expand Down Expand Up @@ -455,9 +455,9 @@ class CharField : public Field {
}

CharField* clone() const override {
auto* local = new CharField();
Field::clone(local);
return local;
std::unique_ptr<CharField> local = std::make_unique<CharField>();
Field::clone(local.get());
return local.release();
}

char* allocate_value(MemPool* pool) const override { return Field::allocate_string_value(pool); }
Expand Down Expand Up @@ -486,9 +486,9 @@ class VarcharField : public Field {
}

VarcharField* clone() const override {
auto* local = new VarcharField();
Field::clone(local);
return local;
std::unique_ptr<VarcharField> local = std::make_unique<VarcharField>();
Field::clone(local.get());
return local.release();
}

char* allocate_value(MemPool* pool) const override { return Field::allocate_string_value(pool); }
Expand Down Expand Up @@ -518,9 +518,9 @@ class BitmapAggField : public Field {
}

BitmapAggField* clone() const override {
auto* local = new BitmapAggField();
Field::clone(local);
return local;
std::unique_ptr<BitmapAggField> local = std::make_unique<BitmapAggField>();
Field::clone(local.get());
return local.release();
}
};

Expand All @@ -542,9 +542,9 @@ class HllAggField : public Field {
}

HllAggField* clone() const override {
auto* local = new HllAggField();
Field::clone(local);
return local;
std::unique_ptr<HllAggField> local = std::make_unique<HllAggField>();
Field::clone(local.get());
return local.release();
}
};

Expand All @@ -566,9 +566,9 @@ class PercentileAggField : public Field {
}

PercentileAggField* clone() const override {
auto* local = new PercentileAggField();
Field::clone(local);
return local;
std::unique_ptr<PercentileAggField> local = std::make_unique<PercentileAggField>();
Field::clone(local.get());
return local.release();
}
};

Expand Down Expand Up @@ -617,9 +617,9 @@ class FieldFactory {
return new VarcharField(column);
case OLAP_FIELD_TYPE_ARRAY: {
std::unique_ptr<Field> item_field(FieldFactory::create(column.subcolumn(0)));
auto* local = new Field(column);
std::unique_ptr<Field> local = std::make_unique<Field>(column);
local->add_sub_field(std::move(item_field));
return local;
return local.release();
}
case OLAP_FIELD_TYPE_DECIMAL32:
return new Field(column, std::make_shared<DecimalTypeInfo<OLAP_FIELD_TYPE_DECIMAL32>>(
Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/olap_cond.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "storage/olap_define.h"
#include "storage/utils.h"
#include "storage/wrapper_field.h"
#include "util/scoped_cleanup.h"

using std::nothrow;
using std::pair;
Expand Down Expand Up @@ -579,7 +580,9 @@ OLAPStatus Conditions::append_condition(const TCondition& tcond) {
auto it = _columns.find(index);
if (it == _columns.end()) {
auto cond_col = new CondColumn(*_schema, index);
ScopedCleanup free_guard([&] { delete cond_col; });
_columns[index] = cond_col;
free_guard.cancel();
return cond_col->add_cond(tcond, column);
} else {
return it->second->add_cond(tcond, column);
Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "storage/tablet.h"
#include "util/date_func.h"
#include "util/mem_util.hpp"
#include "util/scoped_cleanup.h"

using std::nothrow;
using std::set;
Expand Down Expand Up @@ -727,7 +728,9 @@ void Reader::_init_conditions_param(const ReaderParams& read_params) {
_conditions.append_condition(condition);
ColumnPredicate* predicate = _parse_to_predicate(condition);
if (predicate != nullptr) {
ScopedCleanup free_guard([&]() { delete predicate; });
_col_predicates.push_back(predicate);
free_guard.cancel();
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions be/src/storage/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "storage/schema.h"

#include "storage/row_block2.h"
#include "util/scoped_cleanup.h"

namespace starrocks {

Expand All @@ -46,9 +47,15 @@ void Schema::_copy_from(const Schema& other) {
// Deep copy _cols
// TODO(lingbin): really need clone?
_cols.resize(other._cols.size(), nullptr);
ScopedCleanup release_guard([&] {
for (auto col : _cols) {
delete col;
}
});
for (auto cid : _col_ids) {
_cols[cid] = other._cols[cid]->clone();
}
release_guard.cancel();
}

void Schema::_init(const std::vector<TabletColumn>& cols, const std::vector<ColumnId>& col_ids,
Expand Down
45 changes: 30 additions & 15 deletions be/src/storage/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,22 @@ Status StorageEngine::_open() {
}

Status StorageEngine::_init_store_map() {
std::vector<DataDir*> tmp_stores;
std::vector<std::pair<bool, DataDir*>> tmp_stores;
ScopedCleanup release_guard([&] {
for (const auto& item : tmp_stores) {
if (item.first) {
delete item.second;
}
}
});
std::vector<std::thread> threads;
SpinLock error_msg_lock;
std::string error_msg;
for (auto& path : _options.store_paths) {
DataDir* store = new DataDir(path.path, path.storage_medium, _tablet_manager.get(), _txn_manager.get());
tmp_stores.emplace_back(store);
ScopedCleanup store_release_guard([&]() { delete store; });
tmp_stores.emplace_back(true, store);
store_release_guard.cancel();
threads.emplace_back([store, &error_msg_lock, &error_msg]() {
auto st = store->init();
if (!st.ok()) {
Expand All @@ -205,14 +214,12 @@ Status StorageEngine::_init_store_map() {
}

if (!error_msg.empty()) {
for (auto store : tmp_stores) {
delete store;
}
return Status::InternalError(strings::Substitute("init path failed, error=$0", error_msg));
}

for (auto store : tmp_stores) {
_store_map.emplace(store->path(), store);
for (auto& store : tmp_stores) {
_store_map.emplace(store.second->path(), store.second);
store.first = false;
}
return Status::OK();
}
Expand Down Expand Up @@ -921,20 +928,24 @@ OLAPStatus StorageEngine::execute_task(EngineTask* task) {
task->get_related_tablets(&tablet_infos);
sort(tablet_infos.begin(), tablet_infos.end());
std::vector<TabletSharedPtr> related_tablets;
DeferOp release_lock([&]() {
for (TabletSharedPtr& tablet : related_tablets) {
tablet->release_header_lock();
}
});
for (TabletInfo& tablet_info : tablet_infos) {
TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_info.tablet_id);
if (tablet != nullptr) {
related_tablets.push_back(tablet);
tablet->obtain_header_wrlock();
ScopedCleanup release_guard([&]() { tablet->release_header_lock(); });
related_tablets.push_back(tablet);
release_guard.cancel();
} else {
LOG(WARNING) << "could not get tablet before prepare tabletid: " << tablet_info.tablet_id;
}
}
// add write lock to all related tablets
OLAPStatus prepare_status = task->prepare();
for (TabletSharedPtr& tablet : related_tablets) {
tablet->release_header_lock();
}
if (prepare_status != OLAP_SUCCESS) {
return prepare_status;
}
Expand All @@ -955,20 +966,24 @@ OLAPStatus StorageEngine::execute_task(EngineTask* task) {
task->get_related_tablets(&tablet_infos);
sort(tablet_infos.begin(), tablet_infos.end());
std::vector<TabletSharedPtr> related_tablets;
DeferOp release_lock([&]() {
for (TabletSharedPtr& tablet : related_tablets) {
tablet->release_header_lock();
}
});
for (TabletInfo& tablet_info : tablet_infos) {
TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_info.tablet_id);
if (tablet != nullptr) {
related_tablets.push_back(tablet);
tablet->obtain_header_wrlock();
ScopedCleanup release_guard([&]() { tablet->release_header_lock(); });
related_tablets.push_back(tablet);
release_guard.cancel();
} else {
LOG(WARNING) << "Fail to get tablet before finish tablet_id=" << tablet_info.tablet_id;
}
}
// add write lock to all related tablets
OLAPStatus fin_status = task->finish();
for (TabletSharedPtr& tablet : related_tablets) {
tablet->release_header_lock();
}
return fin_status;
}
}
Expand Down
9 changes: 4 additions & 5 deletions be/src/storage/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,8 @@ void TabletMeta::init_from_pb(TabletMetaPB* ptablet_meta_pb) {

// generate AlterTabletTask
if (tablet_meta_pb.has_alter_task()) {
AlterTabletTask* alter_tablet_task = new AlterTabletTask();
alter_tablet_task->init_from_pb(tablet_meta_pb.alter_task());
_alter_task.reset(alter_tablet_task);
_alter_task = std::make_shared<AlterTabletTask>();
_alter_task->init_from_pb(tablet_meta_pb.alter_task());
}

if (tablet_meta_pb.has_in_restore_mode()) {
Expand Down Expand Up @@ -766,9 +765,9 @@ Status TabletMeta::set_alter_state(AlterTabletState alter_state) {
LOG(WARNING) << "original alter task is null, could not set state";
return Status::InternalError("original alter task is null");
} else {
auto alter_tablet_task = new AlterTabletTask(*_alter_task);
auto alter_tablet_task = std::make_shared<AlterTabletTask>(*_alter_task);
RETURN_IF_ERROR(alter_tablet_task->set_alter_state(alter_state));
_alter_task.reset(alter_tablet_task);
_alter_task = alter_tablet_task;
return Status::OK();
}
}
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/tablet_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "storage/rocksdb_status_adapter.h"
#include "storage/storage_engine.h"
#include "storage/tablet_updates.h"
#include "util/defer_op.h"
#include "util/url_coding.h"

namespace starrocks {
Expand Down
Loading

0 comments on commit 1d3621f

Please sign in to comment.