Skip to content

Commit

Permalink
[Enhancement] Improve orc chunk reader performance (StarRocks#13779)
Browse files Browse the repository at this point in the history
  • Loading branch information
Smith-Cruise authored Nov 24, 2022
1 parent 319d17b commit be97f6b
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 70 deletions.
70 changes: 42 additions & 28 deletions be/src/formats/orc/orc_chunk_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1265,8 +1265,10 @@ Status OrcChunkReader::_init_include_columns(const std::unique_ptr<OrcMapping>&
build_column_name_to_id_mapping(&_name_to_column_id, _hive_column_names, _reader->getType(), _case_sensitive);

std::list<uint64_t> include_column_id;

// NOTICE: No need to explicit include root column id, otherwise it will read out all fields.
// Include root column id.
include_column_id.emplace_back(0);
// include_column_id.emplace_back(0);

for (size_t i = 0; i < _src_slot_descriptors.size(); i++) {
SlotDescriptor* desc = _src_slot_descriptors[i];
Expand All @@ -1282,7 +1284,7 @@ Status OrcChunkReader::_init_include_columns(const std::unique_ptr<OrcMapping>&
for (size_t pos : _lazy_load_ctx->lazy_load_indices) {
SlotDescriptor* desc = _src_slot_descriptors[pos];
if (desc == nullptr) continue;
RETURN_IF_ERROR(mapping->set_include_column_id(pos, desc->type(), &lazy_load_column_id));
RETURN_IF_ERROR(mapping->set_lazyload_column_id(pos, &lazy_load_column_id));
}

_row_reader_options.includeLazyLoadColumnIndexes(lazy_load_column_id);
Expand All @@ -1291,6 +1293,14 @@ Status OrcChunkReader::_init_include_columns(const std::unique_ptr<OrcMapping>&
return Status::OK();
}

const std::vector<bool>& OrcChunkReader::TEST_get_selected_column_id_list() {
return _row_reader->getSelectedColumns();
}

const std::vector<bool>& OrcChunkReader::TEST_get_lazyload_column_id_list() {
return _row_reader->getLazyLoadColumns();
}

Status OrcChunkReader::init(std::unique_ptr<orc::Reader> reader) {
_reader = std::move(reader);
// ORC writes empty schema (struct<>) to ORC files containing zero rows.
Expand All @@ -1299,6 +1309,11 @@ Status OrcChunkReader::init(std::unique_ptr<orc::Reader> reader) {
return Status::EndOfFile("number of rows is 0");
}

if (_hive_column_names == nullptr) {
// If hive_column_names is nullptr, we have to use orc's column name.
set_use_orc_column_names(true);
}

// ensure search argument is not null.
// we are going to put row reader filter into search argument applier
// and search argument applier only be constructed when search argument is not null.
Expand All @@ -1308,12 +1323,13 @@ Status OrcChunkReader::init(std::unique_ptr<orc::Reader> reader) {
_row_reader_options.searchArgument(builder->build());
}
// Build root_mapping, including all columns in orc.
const std::unique_ptr<OrcMapping> root_mapping =
OrcMappingFactory::build_mapping(_src_slot_descriptors, _reader->getType(), _case_sensitive);
if (root_mapping == nullptr) {
return Status::InternalError("Build orc root mapping failed.");
}
std::unique_ptr<OrcMapping> root_mapping = nullptr;
ASSIGN_OR_RETURN(root_mapping,
OrcMappingFactory::build_mapping(_src_slot_descriptors, _reader->getType(), _case_sensitive,
_use_orc_column_names, _hive_column_names));
DCHECK(root_mapping != nullptr);
RETURN_IF_ERROR(_init_include_columns(root_mapping));
RETURN_IF_ERROR(_init_src_types(root_mapping));
try {
_row_reader = _reader->createRowReader(_row_reader_options);
} catch (std::exception& e) {
Expand All @@ -1324,23 +1340,21 @@ Status OrcChunkReader::init(std::unique_ptr<orc::Reader> reader) {
}
// Build selected column mapping, because after `include column operation`, all included column's column id
// will re-assign.
_root_selected_mapping =
OrcMappingFactory::build_mapping(_src_slot_descriptors, _reader->getType(), _case_sensitive);
if (_root_selected_mapping == nullptr) {
return Status::InternalError("Build orc root selected mapping failed.");
}
ASSIGN_OR_RETURN(_root_selected_mapping,
OrcMappingFactory::build_mapping(_src_slot_descriptors, _reader->getType(), _case_sensitive,
_use_orc_column_names, _hive_column_names));
DCHECK(_root_selected_mapping != nullptr);
// TODO(SmithCruise) delete _init_position_in_orc() when develop subfield lazy load.
RETURN_IF_ERROR(_init_position_in_orc());
RETURN_IF_ERROR(_init_src_types());
RETURN_IF_ERROR(_init_cast_exprs());
RETURN_IF_ERROR(_init_fill_functions());
return Status::OK();
}

Status OrcChunkReader::_init_position_in_orc() {
int column_size = _src_slot_descriptors.size();
_position_in_orc.clear();
_position_in_orc.resize(column_size);
std::vector<int> position_in_orc;
position_in_orc.resize(column_size);
_slot_id_to_position.clear();

std::unordered_map<int, int> column_id_to_pos;
Expand Down Expand Up @@ -1372,20 +1386,20 @@ Status OrcChunkReader::_init_position_in_orc() {
return Status::NotFound(s);
}
int pos = it2->second;
_position_in_orc[i] = pos;
position_in_orc[i] = pos;
SlotId id = slot_desc->id();
_slot_id_to_position[id] = pos;
}

if (_lazy_load_ctx != nullptr) {
for (int i = 0; i < _lazy_load_ctx->active_load_slots.size(); i++) {
int src_index = _lazy_load_ctx->active_load_indices[i];
int pos = _position_in_orc[src_index];
int pos = position_in_orc[src_index];
_lazy_load_ctx->active_load_orc_positions[i] = pos;
}
for (int i = 0; i < _lazy_load_ctx->lazy_load_slots.size(); i++) {
int src_index = _lazy_load_ctx->lazy_load_indices[i];
int pos = _position_in_orc[src_index];
int pos = position_in_orc[src_index];
_lazy_load_ctx->lazy_load_orc_positions[i] = pos;
}
}
Expand All @@ -1411,12 +1425,12 @@ static Status _create_type_descriptor_by_orc(const TypeDescriptor& origin_type,
// assign selected_fields information
result->selected_fields = origin_type.selected_fields;
DCHECK_EQ(0, result->children.size());
result->children.emplace_back();

TypeDescriptor& key_type = result->children.back();
TypeDescriptor& key_type = result->children.emplace_back();
RETURN_IF_ERROR(_create_type_descriptor_by_orc(origin_type.children.at(0), orc_type->getSubtype(0),
mapping->get_column_id_or_child_mapping(0).orc_mapping,
&key_type));

result->children.emplace_back();
TypeDescriptor& value_type = result->children.back();
RETURN_IF_ERROR(_create_type_descriptor_by_orc(origin_type.children.at(1), orc_type->getSubtype(1),
Expand Down Expand Up @@ -1495,7 +1509,7 @@ static void _try_implicit_cast(TypeDescriptor* from, const TypeDescriptor& to) {
}
}

Status OrcChunkReader::_init_src_types() {
Status OrcChunkReader::_init_src_types(const std::unique_ptr<OrcMapping>& mapping) {
int column_size = _src_slot_descriptors.size();
// update source types.
_src_types.clear();
Expand All @@ -1505,11 +1519,10 @@ Status OrcChunkReader::_init_src_types() {
if (slot_desc == nullptr) {
continue;
}
int pos_of_orc = _position_in_orc[i];
const orc::Type* orc_type = _row_reader->getSelectedType().getSubtype(pos_of_orc);
const orc::Type* orc_type =
_reader->getType().getSubtypeByColumnId(mapping->get_column_id_or_child_mapping(i).orc_column_id);
RETURN_IF_ERROR(_create_type_descriptor_by_orc(
slot_desc->type(), orc_type, _root_selected_mapping->get_column_id_or_child_mapping(i).orc_mapping,
&_src_types[i]));
slot_desc->type(), orc_type, mapping->get_column_id_or_child_mapping(i).orc_mapping, &_src_types[i]));
_try_implicit_cast(&_src_types[i], slot_desc->type());
}
return Status::OK();
Expand Down Expand Up @@ -1577,7 +1590,6 @@ OrcChunkReader::~OrcChunkReader() {
_src_types.clear();
_slot_id_to_desc.clear();
_slot_id_to_position.clear();
_position_in_orc.clear();
_cast_exprs.clear();
_fill_functions.clear();
}
Expand Down Expand Up @@ -1607,7 +1619,7 @@ Status OrcChunkReader::_fill_chunk(ChunkPtr* chunk, const std::vector<SlotDescri
const std::vector<int>* indices) {
int column_size = src_slot_descriptors.size();
DCHECK_GT(_batch->numElements, 0);
const auto& batch_vec = down_cast<orc::StructVectorBatch*>(_batch.get())->fields;
const auto& batch_vec = down_cast<orc::StructVectorBatch*>(_batch.get());
if (_broker_load_mode) {
// always allocate load filter. it's much easier to use in fill chunk function.
if (_broker_load_filter == nullptr) {
Expand All @@ -1625,7 +1637,9 @@ Status OrcChunkReader::_fill_chunk(ChunkPtr* chunk, const std::vector<SlotDescri
src_index = (*indices)[src_index];
}
set_current_slot(slot_desc);
orc::ColumnVectorBatch* cvb = batch_vec[_position_in_orc[src_index]];
orc::ColumnVectorBatch* cvb =
batch_vec->fieldsColumnIdMap[_root_selected_mapping->get_column_id_or_child_mapping(src_index)
.orc_column_id];
if (!slot_desc->is_nullable() && cvb->hasNulls) {
if (_broker_load_mode) {
std::string error_msg =
Expand Down
25 changes: 21 additions & 4 deletions be/src/formats/orc/orc_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,17 @@ class OrcChunkReader {
const cctz::time_zone& tzinfo() { return _tzinfo; }
void drop_nanoseconds_in_datetime() { _drop_nanoseconds_in_datetime = true; }
bool use_nanoseconds_in_datetime() { return !_drop_nanoseconds_in_datetime; }
void set_use_orc_column_names(bool use_orc_column_names) { _use_orc_column_names = use_orc_column_names; }
// methods related to broker load.
void set_broker_load_mode(bool strict_mode) {
_broker_load_mode = true;
_strict_mode = strict_mode;
set_use_orc_column_names(true);
}
void disable_broker_load_mode() {
_broker_load_mode = false;
set_use_orc_column_names(false);
}
void disable_broker_load_mode() { _broker_load_mode = false; }
size_t get_num_rows_filtered() const { return _num_rows_filtered; }
bool get_broker_load_mode() const { return _broker_load_mode; }
bool get_strict_mode() const { return _strict_mode; }
Expand Down Expand Up @@ -140,10 +145,17 @@ class OrcChunkReader {
orc::RowReaderOptions _row_reader_options;
std::vector<SlotDescriptor*> _src_slot_descriptors;
std::unordered_map<SlotId, SlotDescriptor*> _slot_id_to_desc;

// Access ORC columns by name. By default,
// columns in ORC files are accessed by their ordinal position in the Hive table definition.
// Only affect first level behavior, about struct subfield, we still accessed by subfield name rather than position.
// This value now is fixed, in future, it can be passed from FE.
// NOTICE: In broker mode, this value will be set true.
// We make the same behavior as Trino & Presto.
// https://trino.io/docs/current/connector/hive.html?highlight=hive#orc-format-configuration-properties
bool _use_orc_column_names = false;
std::unique_ptr<OrcMapping> _root_selected_mapping;
std::vector<TypeDescriptor> _src_types;
// _src_slot index to position in orc
std::vector<int> _position_in_orc;
// slot id to position in orc.
std::unordered_map<SlotId, int> _slot_id_to_position;
std::vector<Expr*> _cast_exprs;
Expand All @@ -153,7 +165,7 @@ class OrcChunkReader {
std::string* orc_column_name);
Status _init_include_columns(const std::unique_ptr<OrcMapping>& mapping);
Status _init_position_in_orc();
Status _init_src_types();
Status _init_src_types(const std::unique_ptr<OrcMapping>& mapping);
Status _init_cast_exprs();
Status _init_fill_functions();
// holding Expr* in cast_exprs;
Expand All @@ -163,6 +175,11 @@ class OrcChunkReader {
int64_t _tzoffset_in_seconds;
bool _drop_nanoseconds_in_datetime;

// Only used for UT, used after init reader
const std::vector<bool>& TEST_get_selected_column_id_list();
// Only used for UT, used after init reader
const std::vector<bool>& TEST_get_lazyload_column_id_list();

// fields related to broker load.
bool _broker_load_mode;
bool _strict_mode;
Expand Down
Loading

0 comments on commit be97f6b

Please sign in to comment.