Skip to content

Commit

Permalink
Merge pull request databendlabs#3718 from dantengsky/fix-3709
Browse files Browse the repository at this point in the history
ISSUE-3709: table read performance tweaking
  • Loading branch information
BohuTANG authored Dec 31, 2021
2 parents 3328a38 + 1849f76 commit 5ab5456
Show file tree
Hide file tree
Showing 13 changed files with 216 additions and 34 deletions.
48 changes: 40 additions & 8 deletions common/streams/src/sources/source_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ use common_exception::Result;
use common_tracing::tracing;
use common_tracing::tracing::debug_span;
use common_tracing::tracing::Instrument;
use futures::io::BufReader;
use futures::StreamExt;
use futures::TryStreamExt;

/// default buffer size of BufferedReader, 1MB
const DEFAULT_READ_BUFFER_SIZE: u64 = 1024 * 1024;

use crate::Source;

pub struct ParquetSource {
Expand All @@ -46,6 +50,8 @@ pub struct ParquetSource {
row_group: usize,
row_groups: usize,
metadata: Option<FileMetaData>,
file_len: Option<u64>,
read_buffer_size: Option<u64>,
}

impl ParquetSource {
Expand All @@ -54,6 +60,26 @@ impl ParquetSource {
path: String,
table_schema: DataSchemaRef,
projection: Vec<usize>,
) -> Self {
Self::with_hints(
data_accessor,
path,
table_schema,
projection,
None,
None,
None,
)
}

pub fn with_hints(
data_accessor: Arc<dyn DataAccessor>,
path: String,
table_schema: DataSchemaRef,
projection: Vec<usize>,
metadata: Option<FileMetaData>,
file_len: Option<u64>,
read_buffer_size: Option<u64>,
) -> Self {
let block_schema = Arc::new(table_schema.project(projection.clone()));
Self {
Expand All @@ -64,7 +90,9 @@ impl ParquetSource {
projection,
row_group: 0,
row_groups: 0,
metadata: None,
metadata,
file_len,
read_buffer_size,
}
}
}
Expand All @@ -73,23 +101,24 @@ impl ParquetSource {
impl Source for ParquetSource {
#[tracing::instrument(level = "debug", skip_all)]
async fn read(&mut self) -> Result<Option<DataBlock>> {
let metadata = match self.metadata.clone() {
let fetched_metadata;
let metadata = match &self.metadata {
Some(m) => m,
None => {
let mut reader = self
.data_accessor
.get_input_stream(self.path.as_str(), None)?;
let m = read_metadata_async(&mut reader)
fetched_metadata = read_metadata_async(&mut reader)
.instrument(debug_span!("parquet_source_read_meta"))
.await
.map_err(|e| ErrorCode::ParquetError(e.to_string()))?;
self.metadata = Some(m.clone());
self.row_groups = m.row_groups.len();
self.row_group = 0;
m
&fetched_metadata
}
};

self.row_groups = metadata.row_groups.len();
self.row_group = 0;

if self.row_group >= self.row_groups {
return Ok(None);
}
Expand All @@ -102,13 +131,16 @@ impl Source for ParquetSource {
.map(|idx| (metadata.row_groups[row_group].column(idx).clone(), idx));

let fields = self.arrow_table_schema.fields();
let stream_len = self.file_len;
let read_buffer_size = self.read_buffer_size.unwrap_or(DEFAULT_READ_BUFFER_SIZE);

let stream = futures::stream::iter(cols).map(|(col_meta, idx)| {
let data_accessor = self.data_accessor.clone();
let path = self.path.clone();

async move {
let mut reader = data_accessor.get_input_stream(path.as_str(), None)?;
let reader = data_accessor.get_input_stream(path.as_str(), stream_len)?;
let mut reader = BufReader::with_capacity(read_buffer_size as usize, reader);
// TODO cache block column
let col_pages =
get_page_stream(&col_meta, &mut reader, vec![], Arc::new(|_, _| true))
Expand Down
3 changes: 2 additions & 1 deletion query/src/sessions/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ impl Settings {
("flight_client_timeout", u64, 60, "Max duration the flight client request is allowed to take in seconds. By default, it is 60 seconds"),
("min_distributed_rows", u64, 100000000, "Minimum distributed read rows. In cluster mode, when read rows exceeds this value, the local table converted to distributed query."),
("min_distributed_bytes", u64, 500 * 1024 * 1024, "Minimum distributed read bytes. In cluster mode, when read bytes exceeds this value, the local table converted to distributed query."),
("parallel_read_threads", u64, 1, "The maximum number of parallelism for reading data. By default, it is 1.")
("parallel_read_threads", u64, 1, "The maximum number of parallelism for reading data. By default, it is 1."),
("storage_read_buffer_size", u64, 1024 * 1024, "The size of buffer in bytes for buffered reader of dal, default value is 1MB")
}

pub fn try_create() -> Result<Arc<Settings>> {
Expand Down
1 change: 1 addition & 0 deletions query/src/storages/fuse/meta/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct BlockMeta {
/// Pointer of the data Block
pub row_count: u64,
pub block_size: u64,
pub file_size: u64,
pub col_stats: HashMap<ColumnId, ColumnStatistics>,
pub location: BlockLocation,
}
Expand Down
2 changes: 1 addition & 1 deletion query/src/storages/fuse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod cache;
mod constants;
pub mod io;
pub mod meta;
mod operations;
pub mod operations;
pub mod pruning;
pub mod statistics;
mod table;
Expand Down
2 changes: 2 additions & 0 deletions query/src/storages/fuse/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ mod append;
mod commit;
mod operation_log;
mod optimize;
mod part_info;
mod read;
mod read_plan;
mod truncate;

pub use operation_log::AppendOperationLogEntry;
pub use operation_log::TableOperationLog;
pub use part_info::PartInfo;
75 changes: 75 additions & 0 deletions query/src/storages/fuse/operations/part_info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

use common_exception::ErrorCode;
use common_exception::Result;

/// Holds the location and length information of a given [Part].
///
/// It is intended to be encoded as string, and assigned to the field `name` of a given [`Part`],
/// then passed around to the `read` method of [FuseTable], where the name filed will be decoded back to [PartInfo]
///
/// [Part]: common_planners::Part
/// [FuseTable]: crate::storages::fuse::FuseTable
///
#[derive(PartialEq, Debug)]
pub struct PartInfo<'a>(&'a str, u64);

impl<'a> PartInfo<'a> {
#[inline]
pub fn new(location: &'a str, length: u64) -> Self {
Self {
0: location,
1: length,
}
}

#[inline]
pub fn location(&self) -> &str {
self.0
}

#[inline]
pub fn length(&self) -> u64 {
self.1
}

#[inline]
pub fn decode(part_name: &'a str) -> Result<PartInfo> {
let parts = part_name.split('-').collect::<Vec<_>>();
if parts.len() != 2 {
return Err(ErrorCode::LogicalError(format!(
"invalid format of `Part.name` , expects 'name-length', got {}",
part_name
)));
}
let part_location = parts[0];
let part_len = parts[1].parse::<u64>().map_err(|e| {
ErrorCode::LogicalError(format!(
"invalid format of `Part.name` format, expects number for length', but got {}, {}",
parts[1], e
))
})?;
Ok(Self {
0: part_location,
1: part_len,
})
}

#[inline]
pub fn encode(&self) -> String {
format!("{}-{}", self.0, self.1,)
}
}
51 changes: 34 additions & 17 deletions query/src/storages/fuse/operations/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use common_streams::Source;
use common_tracing::tracing_futures::Instrument;
use futures::StreamExt;

use super::part_info::PartInfo;
use crate::sessions::QueryContext;
use crate::storages::fuse::FuseTable;

Expand All @@ -35,20 +36,16 @@ impl FuseTable {
ctx: Arc<QueryContext>,
push_downs: &Option<Extras>,
) -> Result<SendableDataBlockStream> {
let default_proj = || {
(0..self.table_info.schema().fields().len())
.into_iter()
.collect::<Vec<usize>>()
};

let projection = if let Some(Extras {
projection: Some(prj),
..
}) = push_downs
{
prj.clone()
} else {
default_proj()
(0..self.table_info.schema().fields().len())
.into_iter()
.collect::<Vec<usize>>()
};

let bite_size = ctx.get_settings().get_parallel_read_threads()?;
Expand All @@ -68,24 +65,44 @@ impl FuseTable {

let part_stream = futures::stream::iter(iter);

// fallback to stream combinator from async_stream, since
// 1. when using `stream!`, the trace always contains a unclosed call-span (of ParquetSource::read)
// 2. later, when `bit_size` is larger than one, the async reads could be buffered

let read_buffer_size = ctx.get_settings().get_storage_read_buffer_size()?;
let stream = part_stream
.map(move |part| {
let da = da.clone();
let table_schema = table_schema.clone();
let projection = projection.clone();
async move {
let mut source =
ParquetSource::new(da, part.name.clone(), table_schema, projection);
source.read().await?.ok_or_else(|| {
ErrorCode::ParquetError(format!("fail to read block {}", part.name))
})
let part_info = PartInfo::decode(&part.name)?;
let part_location = part_info.location();
let part_len = part_info.length();

let mut source = ParquetSource::with_hints(
da,
part_info.location().to_owned(),
table_schema,
projection,
None, // TODO cache parquet meta
Some(part_len),
Some(read_buffer_size),
);
source
.read()
.await
.map_err(|e| {
ErrorCode::ParquetError(format!(
"fail to read block {}, {}",
part_location, e
))
})?
.ok_or_else(|| {
ErrorCode::ParquetError(format!(
"reader returns None for block {}",
part_location,
))
})
}
})
.buffered(bite_size as usize) // buffer_unordered?
.buffer_unordered(bite_size as usize)
.instrument(common_tracing::tracing::Span::current());
Ok(Box::pin(stream))
}
Expand Down
9 changes: 4 additions & 5 deletions query/src/storages/fuse/operations/read_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_planners::Statistics;

use crate::sessions::QueryContext;
use crate::storages::fuse::meta::BlockMeta;
use crate::storages::fuse::operations::part_info::PartInfo;
use crate::storages::fuse::pruning::apply_block_pruning;
use crate::storages::fuse::FuseTable;

Expand Down Expand Up @@ -57,13 +58,11 @@ impl FuseTable {
blocks_metas.iter().fold(
(Statistics::default(), Partitions::default()),
|(mut stats, mut parts), block_meta| {
parts.push(Part {
name: block_meta.location.path.clone(),
version: 0,
});
let name =
PartInfo::new(block_meta.location.path.as_str(), block_meta.file_size).encode();
parts.push(Part { name, version: 0 });

stats.read_rows += block_meta.row_count as usize;

match &proj_cols {
Some(proj) => {
stats.read_bytes += block_meta
Expand Down
1 change: 1 addition & 0 deletions query/src/storages/fuse/statistics/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl PartiallyAccumulated {
},
row_count: self.block_row_count,
block_size: self.block_size,
file_size,
col_stats: self.block_column_statistics,
};
stats.blocks_metas.push(block_meta);
Expand Down
4 changes: 2 additions & 2 deletions query/tests/it/interpreters/interpreter_interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ async fn test_interpreter_interceptor() -> Result<()> {
"+----------+--------------+-----------+--------------+-----------+------------+--------------+---------------+-------------+--------------+------------+------------------------------------------------------+-----------+-----------------------------+---------------------------------------------------------------------------+",
"| log_type | handler_type | cpu_usage | memory_usage | scan_rows | scan_bytes | written_rows | written_bytes | result_rows | result_bytes | query_kind | query_text | sql_user | sql_user_quota | sql_user_privileges |",
"+----------+--------------+-----------+--------------+-----------+------------+--------------+---------------+-------------+--------------+------------+------------------------------------------------------+-----------+-----------------------------+---------------------------------------------------------------------------+",
"| 1 | TestSession | 8 | 3797 | 0 | 0 | 0 | 0 | 0 | 0 | SelectPlan | select number from numbers_mt(100) where number > 90 | test_user | UserGrantSet { grants: [] } | UserQuota { max_cpu: 0, max_memory_in_bytes: 0, max_storage_in_bytes: 0 } |",
"| 2 | TestSession | 8 | 3797 | 100 | 800 | 0 | 0 | 9 | 72 | SelectPlan | select number from numbers_mt(100) where number > 90 | test_user | UserGrantSet { grants: [] } | UserQuota { max_cpu: 0, max_memory_in_bytes: 0, max_storage_in_bytes: 0 } |",
"| 1 | TestSession | 8 | 4065 | 0 | 0 | 0 | 0 | 0 | 0 | SelectPlan | select number from numbers_mt(100) where number > 90 | test_user | UserGrantSet { grants: [] } | UserQuota { max_cpu: 0, max_memory_in_bytes: 0, max_storage_in_bytes: 0 } |",
"| 2 | TestSession | 8 | 4065 | 100 | 800 | 0 | 0 | 9 | 72 | SelectPlan | select number from numbers_mt(100) where number > 90 | test_user | UserGrantSet { grants: [] } | UserQuota { max_cpu: 0, max_memory_in_bytes: 0, max_storage_in_bytes: 0 } |",
"+----------+--------------+-----------+--------------+-----------+------------+--------------+---------------+-------------+--------------+------------+------------------------------------------------------+-----------+-----------------------------+---------------------------------------------------------------------------+",

];
Expand Down
1 change: 1 addition & 0 deletions query/tests/it/storages/fuse/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//

mod optimize;
mod part_info;
mod purge_drop;
mod purge_truncate;
mod read_plan;
Loading

0 comments on commit 5ab5456

Please sign in to comment.