Skip to content

Commit

Permalink
Make BE runtime dir exception safe (StarRocks#1010)
Browse files Browse the repository at this point in the history
  • Loading branch information
starrocks-xueli authored Nov 12, 2021
1 parent d039609 commit b513f91
Show file tree
Hide file tree
Showing 33 changed files with 189 additions and 251 deletions.
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ bool ResultSinkOperator::need_input() const {
return true;
}
auto* mysql_writer = down_cast<MysqlResultWriter*>(_writer.get());
auto status = mysql_writer->try_add_batch(_fetch_data_result);
auto status = mysql_writer->try_add_batch(std::move(_fetch_data_result));
if (status.ok()) {
return status.value();
} else {
Expand All @@ -85,7 +85,7 @@ Status ResultSinkOperator::push_chunk(RuntimeState* state, const vectorized::Chu
auto status = mysql_writer->process_chunk(chunk.get());
if (status.ok()) {
_fetch_data_result = std::move(status.value());
return mysql_writer->try_add_batch(_fetch_data_result).status();
return mysql_writer->try_add_batch(std::move(_fetch_data_result)).status();
} else {
return status.status();
}
Expand Down
39 changes: 13 additions & 26 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void GetResultBatchCtx::on_close(int64_t packet_seq, QueryStatistics* statistics
delete this;
}

void GetResultBatchCtx::on_data(TFetchDataResult* t_result, int64_t packet_seq, bool eos) {
void GetResultBatchCtx::on_data(const TFetchDataResultPtr& t_result, int64_t packet_seq, bool eos) {
uint8_t* buf = nullptr;
uint32_t len = 0;
ThriftSerializer ser(false, 4096);
Expand All @@ -77,18 +77,13 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size)

BufferControlBlock::~BufferControlBlock() {
cancel();

for (ResultQueue::iterator iter = _batch_queue.begin(); _batch_queue.end() != iter; ++iter) {
delete *iter;
*iter = NULL;
}
}

Status BufferControlBlock::init() {
return Status::OK();
}

Status BufferControlBlock::add_batch(TFetchDataResult* result) {
Status BufferControlBlock::add_batch(TFetchDataResultPtr result) {
std::unique_lock<std::mutex> l(_lock);

if (_is_cancelled) {
Expand All @@ -107,19 +102,18 @@ Status BufferControlBlock::add_batch(TFetchDataResult* result) {

if (_waiting_rpc.empty()) {
_buffer_rows += num_rows;
_batch_queue.push_back(result);
_batch_queue.push_back(std::move(result));
_data_arriaval.notify_one();
} else {
auto* ctx = _waiting_rpc.front();
_waiting_rpc.pop_front();
ctx->on_data(result, _packet_num);
delete result;
_packet_num++;
}
return Status::OK();
}

StatusOr<bool> BufferControlBlock::try_add_batch(TFetchDataResult* result) {
StatusOr<bool> BufferControlBlock::try_add_batch(TFetchDataResultPtr result) {
std::unique_lock<std::mutex> l(_lock);

if (_is_cancelled) {
Expand All @@ -134,20 +128,19 @@ StatusOr<bool> BufferControlBlock::try_add_batch(TFetchDataResult* result) {

if (_waiting_rpc.empty()) {
_buffer_rows += num_rows;
_batch_queue.push_back(result);
_batch_queue.push_back(std::move(result));
_data_arriaval.notify_one();
} else {
auto* ctx = _waiting_rpc.front();
_waiting_rpc.pop_front();
ctx->on_data(result, _packet_num);
delete result;
_packet_num++;
}
return true;
}

Status BufferControlBlock::get_batch(TFetchDataResult* result) {
TFetchDataResult* item = nullptr;
Status BufferControlBlock::get_batch(TFetchDataResultPtr* result) {
TFetchDataResultPtr item;
{
std::unique_lock<std::mutex> l(_lock);

Expand All @@ -166,8 +159,8 @@ Status BufferControlBlock::get_batch(TFetchDataResult* result) {
if (_batch_queue.empty()) {
if (_is_close) {
// no result, normal end
result->eos = true;
result->__set_packet_num(_packet_num);
(*result)->eos = true;
(*result)->__set_packet_num(_packet_num);
_packet_num++;
return Status::OK();
} else {
Expand All @@ -177,17 +170,14 @@ Status BufferControlBlock::get_batch(TFetchDataResult* result) {
}

// get result
item = _batch_queue.front();
item = std::move(_batch_queue.front());
_batch_queue.pop_front();
_buffer_rows -= item->result_batch.rows.size();
_data_removal.notify_one();
}
swap(*result, *item);
result->__set_packet_num(_packet_num);
(*result).swap(item);
(*result)->__set_packet_num(_packet_num);
_packet_num++;
// destruct item new from Result writer
delete item;
item = nullptr;

return Status::OK();
}
Expand All @@ -204,17 +194,14 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
}
if (!_batch_queue.empty()) {
// get result
TFetchDataResult* result = _batch_queue.front();
TFetchDataResultPtr result = std::move(_batch_queue.front());
_batch_queue.pop_front();
_buffer_rows -= result->result_batch.rows.size();
_data_removal.notify_one();

ctx->on_data(result, _packet_num);
_packet_num++;

delete result;
result = nullptr;

return;
}
if (_is_close) {
Expand Down
12 changes: 7 additions & 5 deletions be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ namespace starrocks {
class TFetchDataResult;
class PFetchDataResult;

using TFetchDataResultPtr = std::unique_ptr<TFetchDataResult>;

struct GetResultBatchCtx {
brpc::Controller* cntl = nullptr;
PFetchDataResult* result = nullptr;
Expand All @@ -58,7 +60,7 @@ struct GetResultBatchCtx {

void on_failure(const Status& status);
void on_close(int64_t packet_seq, QueryStatistics* statistics = nullptr);
void on_data(TFetchDataResult* t_result, int64_t packet_seq, bool eos = false);
void on_data(const TFetchDataResultPtr& t_result, int64_t packet_seq, bool eos = false);
};

// buffer used for result customer and productor
Expand All @@ -68,12 +70,12 @@ class BufferControlBlock {
~BufferControlBlock();

Status init();
Status add_batch(TFetchDataResult* result);
Status add_batch(TFetchDataResultPtr result);
// non-blocking version of add_batch
StatusOr<bool> try_add_batch(TFetchDataResult* result);
StatusOr<bool> try_add_batch(TFetchDataResultPtr result);

// get result from batch, use timeout?
Status get_batch(TFetchDataResult* result);
Status get_batch(TFetchDataResultPtr* result);

void get_batch(GetResultBatchCtx* ctx);

Expand All @@ -99,7 +101,7 @@ class BufferControlBlock {
}

private:
typedef std::list<TFetchDataResult*> ResultQueue;
typedef std::list<TFetchDataResultPtr> ResultQueue;

// result's query id
TUniqueId _fragment_id;
Expand Down
17 changes: 5 additions & 12 deletions be/src/runtime/client_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ inline std::size_t hash_value(const TNetworkAddress& host_port) {
return HashUtil::hash(&host_port.port, sizeof(host_port.port), hash);
}

ClientCacheHelper::~ClientCacheHelper() {
for (auto& it : _client_map) {
delete it.second;
}
}
ClientCacheHelper::~ClientCacheHelper() {}

Status ClientCacheHelper::get_client(const TNetworkAddress& hostport, const client_factory& factory_method,
void** client_key, int timeout_ms) {
Expand Down Expand Up @@ -81,7 +77,7 @@ Status ClientCacheHelper::reopen_client(const client_factory& factory_method, vo
std::lock_guard<std::mutex> lock(_lock);
ClientMap::iterator i = _client_map.find(*client_key);
DCHECK(i != _client_map.end());
ThriftClientImpl* info = i->second;
std::unique_ptr<ThriftClientImpl>& info = i->second;
const std::string ipaddress = info->ipaddress();
int port = info->port();

Expand All @@ -91,7 +87,6 @@ Status ClientCacheHelper::reopen_client(const client_factory& factory_method, vo
// not clean up internal buffers it reopens. To work around this issue, create a new
// client instead.
_client_map.erase(*client_key);
delete info;
*client_key = nullptr;

if (_metrics_enabled) {
Expand Down Expand Up @@ -122,7 +117,7 @@ Status ClientCacheHelper::create_client(const TNetworkAddress& hostport, const c
}

// Because the client starts life 'checked out', we don't add it to the cache map
_client_map[*client_key] = client_impl.release();
_client_map[*client_key] = std::move(client_impl);

if (_metrics_enabled) {
_opened_clients->increment(1);
Expand All @@ -136,15 +131,14 @@ void ClientCacheHelper::release_client(void** client_key) {
std::lock_guard<std::mutex> lock(_lock);
ClientMap::iterator client_map_entry = _client_map.find(*client_key);
DCHECK(client_map_entry != _client_map.end());
ThriftClientImpl* info = client_map_entry->second;
std::unique_ptr<ThriftClientImpl>& info = client_map_entry->second;
ClientCacheMap::iterator j = _client_cache.find(make_network_address(info->ipaddress(), info->port()));
DCHECK(j != _client_cache.end());

if (_max_cache_size_per_host >= 0 && j->second.size() >= _max_cache_size_per_host) {
// cache of this host is full, close this client connection and remove if from _client_map
info->close();
_client_map.erase(*client_key);
delete info;

if (_metrics_enabled) {
_opened_clients->increment(-1);
Expand All @@ -171,10 +165,9 @@ void ClientCacheHelper::close_connections(const TNetworkAddress& hostport) {
for (void* client_key : client_keys) {
ClientMap::iterator client_map_entry = _client_map.find(client_key);
DCHECK(client_map_entry != _client_map.end());
ThriftClientImpl* info = client_map_entry->second;
std::unique_ptr<ThriftClientImpl>& info = client_map_entry->second;
info->close();
_client_map.erase(client_key);
delete info;
}
_client_cache.erase(cache_entry);
}
Expand Down
19 changes: 10 additions & 9 deletions be/src/runtime/client_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class ClientCacheHelper {
~ClientCacheHelper();
// Callback method which produces a client object when one cannot be
// found in the cache. Supplied by the ClientCache wrapper.
typedef std::function<ThriftClientImpl*(const TNetworkAddress& hostport, void** client_key)> client_factory;
typedef std::function<std::unique_ptr<ThriftClientImpl>(const TNetworkAddress& hostport, void** client_key)>
client_factory;

// Return client for specific host/port in 'client'. If a client
// is not available, the client parameter is set to NULL.
Expand Down Expand Up @@ -104,11 +105,11 @@ class ClientCacheHelper {
std::mutex _lock;

// map from (host, port) to list of client keys for that address
typedef boost::unordered_map<TNetworkAddress, std::list<void*> > ClientCacheMap;
typedef boost::unordered_map<TNetworkAddress, std::list<void*>> ClientCacheMap;
ClientCacheMap _client_cache;

// Map from client key back to its associated ThriftClientImpl transport
typedef boost::unordered_map<void*, ThriftClientImpl*> ClientMap;
typedef boost::unordered_map<void*, std::unique_ptr<ThriftClientImpl>> ClientMap;
ClientMap _client_map;

// MetricRegistry
Expand Down Expand Up @@ -196,13 +197,13 @@ class ClientCache {
typedef ThriftClient<T> Client;

ClientCache()
: _client_factory(std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this,
std::placeholders::_1, std::placeholders::_2)) {}
: _client_factory(std::bind<std::unique_ptr<ThriftClientImpl>>(
std::mem_fn(&ClientCache::make_client), this, std::placeholders::_1, std::placeholders::_2)) {}

ClientCache(int max_cache_size)
: _client_cache_helper(max_cache_size),
_client_factory(std::bind<ThriftClientImpl*>(std::mem_fn(&ClientCache::make_client), this,
std::placeholders::_1, std::placeholders::_2)) {}
_client_factory(std::bind<std::unique_ptr<ThriftClientImpl>>(
std::mem_fn(&ClientCache::make_client), this, std::placeholders::_1, std::placeholders::_2)) {}

// Helper method which returns a debug string
std::string debug_string() { return _client_cache_helper.debug_string(); }
Expand Down Expand Up @@ -253,8 +254,8 @@ class ClientCache {
void release_client(T** client) { return _client_cache_helper.release_client(reinterpret_cast<void**>(client)); }

// Factory method to produce a new ThriftClient<T> for the wrapped cache
ThriftClientImpl* make_client(const TNetworkAddress& hostport, void** client_key) {
Client* client = new Client(hostport.hostname, hostport.port);
std::unique_ptr<ThriftClientImpl> make_client(const TNetworkAddress& hostport, void** client_key) {
std::unique_ptr<Client> client = std::make_unique<Client>(Client(hostport.hostname, hostport.port));
*client_key = reinterpret_cast<void*>(client->iface());
return client;
}
Expand Down
35 changes: 22 additions & 13 deletions be/src/runtime/data_stream_recvr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class DataStreamRecvr::SenderQueue {
public:
SenderQueue(DataStreamRecvr* parent_recvr, int num_senders);

~SenderQueue() = default;
~SenderQueue();

// Return the next batch form this sender queue. Sets the returned batch in _cur_batch.
// A returned batch that is not filled to capacity does *not* indicate
Expand Down Expand Up @@ -136,7 +136,7 @@ class DataStreamRecvr::SenderQueue {

// queue of (batch length, batch) pairs. The SenderQueue block owns memory to
// these batches. They are handed off to the caller via get_batch.
typedef list<pair<int, RowBatch*>> RowBatchQueue;
typedef list<pair<int, std::unique_ptr<RowBatch>>> RowBatchQueue;
RowBatchQueue _batch_queue;

typedef std::list<std::pair<int, ChunkUniquePtr>> ChunkQueue;
Expand Down Expand Up @@ -289,17 +289,23 @@ void DataStreamRecvr::SenderQueue::add_batch(const PRowBatch& pb_batch, int be_n
return;
}

RowBatch* batch = nullptr;
std::unique_ptr<RowBatch> batch;
{
SCOPED_TIMER(_recvr->_deserialize_row_batch_timer);
// Note: if this function makes a row batch, the batch *must* be added
// to _batch_queue. It is not valid to create the row batch and destroy
// it in this thread.
batch = new RowBatch(_recvr->row_desc(), pb_batch);
batch = std::make_unique<RowBatch>(_recvr->row_desc(), pb_batch);
Status status = batch->init(pb_batch);
if (!status.ok()) {
LOG(WARNING) << "batch init failed, [cur_packet_id= " << iter->second << " receive_packet_id=" << packet_seq
<< "]";
return;
}
}

VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size << "\n";
_batch_queue.emplace_back(batch_size, batch);
_batch_queue.emplace_back(batch_size, std::move(batch));

// if done is nullptr, this function can't delay this response
if (done != nullptr && _recvr->exceeds_limit(batch_size)) {
Expand Down Expand Up @@ -532,14 +538,14 @@ void DataStreamRecvr::SenderQueue::close() {
_pending_closures.clear();
}

// Delete any batches queued in _batch_queue
for (auto& it : _batch_queue) {
delete it.second;
}

_batch_queue.clear();
_current_batch.reset();
}

DataStreamRecvr::SenderQueue::~SenderQueue() {
close();
}

Status DataStreamRecvr::create_merger(const SortExecExprs* exprs, const std::vector<bool>* is_asc,
const std::vector<bool>* is_null_first) {
DCHECK(_is_merging);
Expand Down Expand Up @@ -692,15 +698,18 @@ void DataStreamRecvr::close() {
for (auto& _sender_queue : _sender_queues) {
_sender_queue->close();
}
_sender_queues.clear();
// Remove this receiver from the DataStreamMgr that created it.
// TODO: log error msg
_mgr->deregister_recvr(fragment_instance_id(), dest_node_id());
_mgr = nullptr;
if (_mgr) {
_mgr->deregister_recvr(fragment_instance_id(), dest_node_id());
_mgr = nullptr;
}
_chunks_merger.reset();
}

DataStreamRecvr::~DataStreamRecvr() {
DCHECK(_mgr == nullptr) << "Must call close()";
close();
}

Status DataStreamRecvr::get_chunk(std::unique_ptr<vectorized::Chunk>* chunk) {
Expand Down
Loading

0 comments on commit b513f91

Please sign in to comment.