Skip to content

Commit

Permalink
flush parquet file
Browse files Browse the repository at this point in the history
Signed-off-by: Jiao Mingye <[email protected]>
  • Loading branch information
mxdzs0612 committed Apr 8, 2024
1 parent e9704de commit 473ad00
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,9 @@ CONF_mInt64(pk_dump_interval_seconds, "3600"); // 1 hour
// Min data processed when scaling connector sink writers, default value is the same as Trino
CONF_mInt64(writer_scaling_min_size_mb, "128");

// Memory threshold for Parquet writer before a flush, should be greater than 0 and less than 1
CONF_mDouble(parquet_writer_mem_usage_threshold, "0.8");

// whether enable query profile for queries initiated by spark or flink
CONF_mBool(enable_profile_for_external_plan, "false");

Expand Down
9 changes: 8 additions & 1 deletion be/src/formats/parquet/parquet_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "column/struct_column.h"
#include "column/vectorized_fwd.h"
#include "common/logging.h"
#include "exec/pipeline/query_context.h"
#include "exprs/expr.h"
#include "formats/file_writer.h"
#include "formats/utils.h"
Expand All @@ -48,7 +49,13 @@ std::future<Status> ParquetFileWriter::write(ChunkPtr chunk) {
if (auto status = _rowgroup_writer->write(chunk.get()); !status.ok()) {
return make_ready_future(std::move(status));
}
if (_rowgroup_writer->estimated_buffered_bytes() >= _writer_options->rowgroup_size) {
double mem_usage = 0.0;
if (_runtime_state != nullptr) {
mem_usage = static_cast<double>(_runtime_state->query_mem_tracker_ptr()->consumption()) /
_runtime_state->query_ctx()->get_static_query_mem_limit();
}
if (mem_usage > config::parquet_writer_mem_usage_threshold ||
_rowgroup_writer->estimated_buffered_bytes() >= _writer_options->rowgroup_size) {
return _flush_row_group();
}
return make_ready_future(Status::OK());
Expand Down

0 comments on commit 473ad00

Please sign in to comment.