Skip to content

Commit

Permalink
fix(query): fix arrow_rs read variant type (databendlabs#13774)
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh authored Nov 21, 2023
1 parent c7f9771 commit 85db85f
Show file tree
Hide file tree
Showing 14 changed files with 99 additions and 46 deletions.
4 changes: 1 addition & 3 deletions src/query/expression/src/convert_arrow_rs/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;

use arrow_array::Array;
use arrow_schema::ArrowError;
use arrow_schema::Field;

use crate::Column;
use crate::DataField;
Expand All @@ -28,8 +27,7 @@ impl Column {
Ok(arrow_array)
}

pub fn from_arrow_rs(array: Arc<dyn Array>, field: &Field) -> Result<Self, ArrowError> {
let field = DataField::try_from(field)?;
pub fn from_arrow_rs(array: Arc<dyn Array>, field: &DataField) -> Result<Self, ArrowError> {
let arrow2_array: Box<dyn common_arrow::arrow::array::Array> = array.into();

Ok(Column::from_arrow(arrow2_array.as_ref(), field.data_type()))
Expand Down
12 changes: 7 additions & 5 deletions src/query/expression/src/convert_arrow_rs/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ impl DataBlock {
RecordBatch::try_new(schema, arrays)
}

pub fn from_record_batch(batch: &RecordBatch) -> Result<(Self, DataSchema), ArrowError> {
let schema: DataSchema = DataSchema::try_from(&(*batch.schema()))?;
pub fn from_record_batch(
schema: &DataSchema,
batch: &RecordBatch,
) -> Result<(Self, DataSchema), ArrowError> {
if batch.num_columns() == 0 {
return Ok((DataBlock::new(vec![], batch.num_rows()), schema));
return Ok((DataBlock::new(vec![], batch.num_rows()), schema.clone()));
}

let mut columns = Vec::with_capacity(batch.columns().len());
for (array, field) in batch.columns().iter().zip(batch.schema().fields().iter()) {
for (array, field) in batch.columns().iter().zip(schema.fields().iter()) {
columns.push(Column::from_arrow_rs(array.clone(), field)?)
}
Ok((DataBlock::new_from_columns(columns), schema))
Ok((DataBlock::new_from_columns(columns), schema.clone()))
}
}
46 changes: 34 additions & 12 deletions src/query/expression/src/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1260,8 +1260,16 @@ impl Column {
let is_nullable = data_type.is_nullable();
let data_type = data_type.remove_nullable();
let column = match arrow_col.data_type() {
ArrowDataType::Null => Column::Null {
len: arrow_col.len(),
ArrowDataType::Null => match data_type {
DataType::EmptyArray => Column::EmptyArray {
len: arrow_col.len(),
},
DataType::EmptyMap => Column::EmptyMap {
len: arrow_col.len(),
},
_ => Column::Null {
len: arrow_col.len(),
},
},
ArrowDataType::Extension(name, _, _) if name == ARROW_EXT_TYPE_EMPTY_ARRAY => {
Column::EmptyArray {
Expand Down Expand Up @@ -1373,11 +1381,15 @@ impl Column {
let offsets = arrow_col.offsets().clone().into_inner();

let offsets = unsafe { std::mem::transmute::<Buffer<i64>, Buffer<u64>>(offsets) };
if data_type.is_variant() {
// Variant column from udf server is converted to LargeBinary, we restore it back here.
Column::Variant(StringColumn::new(arrow_col.values().clone(), offsets))
} else {
Column::String(StringColumn::new(arrow_col.values().clone(), offsets))
// LargeBinary may be Extension data type variant and bitmap
match data_type {
DataType::Variant => {
Column::Variant(StringColumn::new(arrow_col.values().clone(), offsets))
}
DataType::Bitmap => {
Column::Bitmap(StringColumn::new(arrow_col.values().clone(), offsets))
}
_ => Column::String(StringColumn::new(arrow_col.values().clone(), offsets)),
}
}
// TODO: deprecate it and use LargeBinary instead
Expand All @@ -1392,11 +1404,21 @@ impl Column {
.iter()
.map(|x| *x as u64)
.collect::<Vec<_>>();

Column::String(StringColumn::new(
arrow_col.values().clone(),
offsets.into(),
))
// Binary may be Extension data type variant and bitmap
match data_type {
DataType::Variant => Column::Variant(StringColumn::new(
arrow_col.values().clone(),
offsets.into(),
)),
DataType::Bitmap => Column::Bitmap(StringColumn::new(
arrow_col.values().clone(),
offsets.into(),
)),
_ => Column::String(StringColumn::new(
arrow_col.values().clone(),
offsets.into(),
)),
}
}

ArrowDataType::FixedSizeBinary(size) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ impl AsyncTransform for TransformUdf {
.await?;
let result_batch = client.do_exchange(&func.func_name, input_batch).await?;

let (result_block, result_schema) = DataBlock::from_record_batch(&result_batch)
.map_err(|err| {
let schema = DataSchema::try_from(&(*result_batch.schema()))?;
let (result_block, result_schema) =
DataBlock::from_record_batch(&schema, &result_batch).map_err(|err| {
ErrorCode::UDFDataError(format!(
"Cannot convert arrow record batch to data block: {err}"
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_catalog::plan::Projection;
use common_exception::Result;
use common_expression::types::DataType;
use common_expression::DataBlock;
use common_expression::DataSchema;
use common_expression::Evaluator;
use common_expression::Expr;
use common_expression::FunctionContext;
Expand Down Expand Up @@ -76,7 +77,8 @@ impl ParquetPredicate {
}

pub fn evaluate(&self, batch: &RecordBatch) -> Result<BooleanArray> {
let block = transform_record_batch(batch, &self.field_paths)?;
let data_schema = DataSchema::from(&self.schema);
let block = transform_record_batch(&data_schema, batch, &self.field_paths)?;
let res = self.evaluate_block(&block)?;
Ok(bitmap_to_boolean_array(res))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use common_exception::Result;
use common_expression::DataBlock;
use common_expression::DataSchema;
use common_expression::TopKSorter;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use parquet::arrow::arrow_reader::RowSelection;
Expand All @@ -33,6 +34,7 @@ use crate::parquet_rs::parquet_reader::utils::FieldPaths;

pub struct NoPretchPolicyBuilder {
projection: ProjectionMask,
data_schema: DataSchema,
field_levels: FieldLevels,
field_paths: Arc<Option<FieldPaths>>,
}
Expand All @@ -55,6 +57,7 @@ impl ReadPolicyBuilder for NoPretchPolicyBuilder {
)?;
Ok(Some(Box::new(NoPrefetchPolicy {
field_paths: self.field_paths.clone(),
data_schema: self.data_schema.clone(),
reader,
})))
}
Expand All @@ -63,12 +66,14 @@ impl ReadPolicyBuilder for NoPretchPolicyBuilder {
impl NoPretchPolicyBuilder {
pub fn create(
schema: &SchemaDescriptor,
data_schema: DataSchema,
projection: ProjectionMask,
field_paths: Arc<Option<FieldPaths>>,
) -> Result<Box<dyn ReadPolicyBuilder>> {
let field_levels = parquet_to_arrow_field_levels(schema, projection.clone(), None)?;
Ok(Box::new(NoPretchPolicyBuilder {
field_levels,
data_schema,
projection,
field_paths,
}))
Expand All @@ -87,6 +92,7 @@ pub struct NoPrefetchPolicy {
/// we should extract inner columns from the struct manually by traversing the nested column;
/// if `field_paths` is [None], we can skip the traversing.
field_paths: Arc<Option<FieldPaths>>,
data_schema: DataSchema,

reader: ParquetRecordBatchReader,
}
Expand All @@ -95,7 +101,7 @@ impl ReadPolicy for NoPrefetchPolicy {
fn read_block(&mut self) -> Result<Option<DataBlock>> {
let batch = self.reader.next().transpose()?;
if let Some(batch) = batch {
let block = transform_record_batch(&batch, &self.field_paths)?;
let block = transform_record_batch(&self.data_schema, &batch, &self.field_paths)?;
Ok(Some(block))
} else {
Ok(None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl ReadPolicyBuilder for PredicateAndTopkPolicyBuilder {
.await?;
// Topk column **must** not be in a nested column.
let block = read_all(
self.src_schema.as_ref(),
&row_group,
topk.field_levels(),
selection.clone(),
Expand All @@ -191,6 +192,7 @@ impl ReadPolicyBuilder for PredicateAndTopkPolicyBuilder {
.fetch(self.predicate.projection(), selection.as_ref())
.await?;
let block = read_all(
self.src_schema.as_ref(),
&row_group,
self.predicate.field_levels(),
selection.clone(),
Expand Down Expand Up @@ -289,7 +291,8 @@ impl ReadPolicy for PredicateAndTopkPolicy {
if let Some(batch) = batch {
debug_assert!(!self.prefetched.is_empty());
let prefetched = self.prefetched.pop_front().unwrap();
let mut block = transform_record_batch(&batch, &self.remain_field_paths)?;
let mut block =
transform_record_batch(self.src_schema.as_ref(), &batch, &self.remain_field_paths)?;
block.merge_block(prefetched);
let block = block.resort(&self.src_schema, &self.dst_schema)?;
Ok(Some(block))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ impl ReadPolicyBuilder for TopkOnlyPolicyBuilder {
.fetch(self.topk.projection(), selection.as_ref())
.await?;
let block = read_all(
self.src_schema.as_ref(),
&row_group,
self.topk.field_levels(),
selection.clone(),
Expand Down Expand Up @@ -211,7 +212,8 @@ impl ReadPolicy for TopkOnlyPolicy {
debug_assert!(
self.prefetched.is_none() || !self.prefetched.as_ref().unwrap().is_empty()
);
let mut block = transform_record_batch(&batch, &self.remain_field_paths)?;
let mut block =
transform_record_batch(self.src_schema.as_ref(), &batch, &self.remain_field_paths)?;
if let Some(q) = self.prefetched.as_mut() {
let prefetched = q.pop_front().unwrap();
block.add_column(prefetched);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use common_exception::Result;
use common_expression::DataBlock;
use common_expression::DataSchema;
use common_expression::TopKSorter;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use parquet::arrow::arrow_reader::RowSelection;
Expand All @@ -26,6 +27,7 @@ use crate::parquet_rs::parquet_reader::utils::transform_record_batch;
use crate::parquet_rs::parquet_reader::utils::FieldPaths;

pub fn read_all(
data_schema: &DataSchema,
rg: &InMemoryRowGroup,
field_levels: &FieldLevels,
selection: Option<RowSelection>,
Expand All @@ -36,7 +38,7 @@ pub fn read_all(
ParquetRecordBatchReader::try_new_with_row_groups(field_levels, rg, num_rows, selection)?;
let batch = reader.next().transpose()?.unwrap();
debug_assert!(reader.next().is_none());
transform_record_batch(&batch, field_paths)
transform_record_batch(data_schema, &batch, field_paths)
}

#[inline]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_catalog::plan::PushDownInfo;
use common_catalog::plan::TopK;
use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_expression::DataSchema;
use common_expression::TableSchemaRef;
use opendal::Operator;
use parquet::arrow::arrow_to_parquet_schema;
Expand Down Expand Up @@ -204,8 +205,10 @@ impl<'a> ParquetRSReaderBuilder<'a> {
.map(|(proj, _, _, paths)| (proj.clone(), paths.clone()))
.unwrap();

let schema = Arc::new(DataSchema::from(&self.table_schema.as_ref()));
Ok(ParquetRSFullReader {
op: self.op.clone(),
schema,
predicate,
projection,
field_paths,
Expand Down Expand Up @@ -261,9 +264,11 @@ impl<'a> ParquetRSReaderBuilder<'a> {
}

fn create_no_prefetch_policy_builder(&self) -> Result<Box<dyn ReadPolicyBuilder>> {
let (projection, _, _, output_field_paths) = self.built_output.as_ref().unwrap();
let (projection, _, schema, output_field_paths) = self.built_output.as_ref().unwrap();
let data_schema = DataSchema::from(schema);
NoPretchPolicyBuilder::create(
&self.schema_desc,
data_schema,
projection.clone(),
output_field_paths.clone(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use arrow_schema::ArrowError;
use bytes::Bytes;
use common_exception::Result;
use common_expression::DataBlock;
use common_expression::DataSchemaRef;
use common_metrics::storage::metrics_inc_omit_filter_rowgroups;
use common_metrics::storage::metrics_inc_omit_filter_rows;
use futures::StreamExt;
Expand All @@ -40,6 +41,7 @@ use crate::ParquetRSPruner;
/// The reader to read a whole parquet file.
pub struct ParquetRSFullReader {
pub(super) op: Operator,
pub(super) schema: DataSchemaRef,
pub(super) predicate: Option<Arc<ParquetPredicate>>,

/// Columns to output.
Expand Down Expand Up @@ -122,7 +124,7 @@ impl ParquetRSFullReader {
let record_batch = stream.next().await.transpose()?;

if let Some(batch) = record_batch {
let blocks = transform_record_batch(&batch, &self.field_paths)?;
let blocks = transform_record_batch(self.schema.as_ref(), &batch, &self.field_paths)?;
Ok(Some(blocks))
} else {
Ok(None)
Expand Down Expand Up @@ -175,7 +177,6 @@ impl ParquetRSFullReader {
)]));
}
}

let reader = builder.build()?;
// Write `if` outside iteration to reduce branches.
if let Some(field_paths) = self.field_paths.as_ref() {
Expand All @@ -191,7 +192,7 @@ impl ParquetRSFullReader {
.into_iter()
.map(|batch| {
let batch = batch?;
Ok(DataBlock::from_record_batch(&batch)?.0)
Ok(DataBlock::from_record_batch(self.schema.as_ref(), &batch)?.0)
})
.collect()
}
Expand Down
Loading

0 comments on commit 85db85f

Please sign in to comment.