Skip to content

Commit

Permalink
Adds WindowUDFImpl::reverse_exprtrait method + Support for `IGNORE …
Browse files Browse the repository at this point in the history
…NULLS` (apache#12662)

* Adds method for reversing a user-defined window function

* Adds support for `IGNORE NULLS`

* Adds doc comments for `reverse_expr`

* Minor: copy edit for doc comment

* Adds doc comments for `WindowUDFExpr` fields
  • Loading branch information
jcsherin authored Sep 29, 2024
1 parent c4b48d7 commit a0a635a
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 5 deletions.
2 changes: 1 addition & 1 deletion datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub use sqlparser;
pub use table_source::{TableProviderFilterPushDown, TableSource, TableType};
pub use udaf::{AggregateUDF, AggregateUDFImpl, ReversedUDAF};
pub use udf::{ScalarUDF, ScalarUDFImpl};
pub use udwf::{WindowUDF, WindowUDFImpl};
pub use udwf::{ReversedUDWF, WindowUDF, WindowUDFImpl};
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};

#[cfg(test)]
Expand Down
26 changes: 26 additions & 0 deletions datafusion/expr/src/udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ impl WindowUDF {
pub fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
self.inner.coerce_types(arg_types)
}

/// Returns the reversed user-defined window function when the
/// order of evaluation is reversed.
///
/// See [`WindowUDFImpl::reverse_expr`] for more details.
pub fn reverse_expr(&self) -> ReversedUDWF {
self.inner.reverse_expr()
}
}

impl<F> From<F> for WindowUDF
Expand Down Expand Up @@ -351,6 +359,24 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> {
not_impl_err!("Function {} does not implement coerce_types", self.name())
}

/// Allows customizing the behavior of the user-defined window
/// function when it is evaluated in reverse order.
fn reverse_expr(&self) -> ReversedUDWF {
ReversedUDWF::NotSupported
}
}

pub enum ReversedUDWF {
/// The result of evaluating the user-defined window function
/// remains identical when reversed.
Identical,
/// A window function which does not support evaluating the result
/// in reverse order.
NotSupported,
/// Customize the user-defined window function for evaluating the
/// result in reverse order.
Reversed(Arc<WindowUDF>),
}

impl PartialEq for dyn WindowUDFImpl {
Expand Down
28 changes: 24 additions & 4 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use datafusion_common::{
exec_datafusion_err, exec_err, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::{
BuiltInWindowFunction, PartitionEvaluator, WindowFrame, WindowFunctionDefinition,
WindowUDF,
BuiltInWindowFunction, PartitionEvaluator, ReversedUDWF, WindowFrame,
WindowFunctionDefinition, WindowUDF,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::equivalence::collapse_lex_req;
Expand Down Expand Up @@ -130,7 +130,7 @@ pub fn create_window_expr(
}
// TODO: Ordering not supported for Window UDFs yet
WindowFunctionDefinition::WindowUDF(fun) => Arc::new(BuiltInWindowExpr::new(
create_udwf_window_expr(fun, args, input_schema, name)?,
create_udwf_window_expr(fun, args, input_schema, name, ignore_nulls)?,
partition_by,
order_by,
window_frame,
Expand Down Expand Up @@ -329,6 +329,7 @@ fn create_udwf_window_expr(
args: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
name: String,
ignore_nulls: bool,
) -> Result<Arc<dyn BuiltInWindowFunctionExpr>> {
// need to get the types into an owned vec for some reason
let input_types: Vec<_> = args
Expand All @@ -341,6 +342,8 @@ fn create_udwf_window_expr(
args: args.to_vec(),
input_types,
name,
is_reversed: false,
ignore_nulls,
}))
}

Expand All @@ -353,6 +356,12 @@ struct WindowUDFExpr {
name: String,
/// Types of input expressions
input_types: Vec<DataType>,
/// This is set to `true` only if the user-defined window function
/// expression supports evaluation in reverse order, and the
/// evaluation order is reversed.
is_reversed: bool,
/// Set to `true` if `IGNORE NULLS` is defined, `false` otherwise.
ignore_nulls: bool,
}

impl BuiltInWindowFunctionExpr for WindowUDFExpr {
Expand All @@ -378,7 +387,18 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr {
}

fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
None
match self.fun.reverse_expr() {
ReversedUDWF::Identical => Some(Arc::new(self.clone())),
ReversedUDWF::NotSupported => None,
ReversedUDWF::Reversed(fun) => Some(Arc::new(WindowUDFExpr {
fun,
args: self.args.clone(),
name: self.name.clone(),
input_types: self.input_types.clone(),
is_reversed: !self.is_reversed,
ignore_nulls: self.ignore_nulls,
})),
}
}

fn get_result_ordering(&self, schema: &SchemaRef) -> Option<PhysicalSortExpr> {
Expand Down

0 comments on commit a0a635a

Please sign in to comment.