Skip to content

Commit

Permalink
Unparse TableScan with projections, filters or fetch to SQL string (a…
Browse files Browse the repository at this point in the history
…pache#12158)

* support to unparse table scan with projection

* support to unparse table_scan with filter and fetch

* add the doc for public function

* clippy

* remove pub get method for table scan

* fix merge conflict

* add issue reference

* fix typo
  • Loading branch information
goldmedal authored Sep 7, 2024
1 parent 82fb5b9 commit ddbdf4b
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 11 deletions.
36 changes: 36 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,19 @@ impl LogicalPlanBuilder {
.map(Self::new)
}

/// Convert a table provider into a builder with a TableScan with filter and fetch
pub fn scan_with_filters_fetch(
table_name: impl Into<TableReference>,
table_source: Arc<dyn TableSource>,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
fetch: Option<usize>,
) -> Result<Self> {
TableScan::try_new(table_name, table_source, projection, filters, fetch)
.map(LogicalPlan::TableScan)
.map(Self::new)
}

/// Wrap a plan in a window
pub fn window_plan(
input: LogicalPlan,
Expand Down Expand Up @@ -1424,6 +1437,29 @@ pub fn table_scan_with_filters(
LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
}

/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
/// filters, and inlined fetch.
/// This is mostly used for testing and documentation.
pub fn table_scan_with_filter_and_fetch(
name: Option<impl Into<TableReference>>,
table_schema: &Schema,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
fetch: Option<usize>,
) -> Result<LogicalPlanBuilder> {
let table_source = table_source(table_schema);
let name = name
.map(|n| n.into())
.unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
LogicalPlanBuilder::scan_with_filters_fetch(
name,
table_source,
projection,
filters,
fetch,
)
}

fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
let table_schema = Arc::new(table_schema.clone());
Arc::new(LogicalTableSource { table_schema })
Expand Down
99 changes: 91 additions & 8 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use crate::unparser::utils::unproject_agg_exprs;
use datafusion_common::{
internal_err, not_impl_err, plan_err, Column, DataFusionError, Result,
internal_err, not_impl_err, plan_err, Column, DataFusionError, Result, TableReference,
};
use datafusion_expr::{
expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan, Projection,
SortExpr,
expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
LogicalPlanBuilder, Projection, SortExpr,
};
use sqlparser::ast::{self, Ident, SetExpr};

use crate::unparser::utils::unproject_agg_exprs;
use std::sync::Arc;

use super::{
ast::{
Expand Down Expand Up @@ -240,6 +240,19 @@ impl Unparser<'_> {
) -> Result<()> {
match plan {
LogicalPlan::TableScan(scan) => {
if scan.projection.is_some()
|| !scan.filters.is_empty()
|| scan.fetch.is_some()
{
let unparsed_table_scan =
Self::unparse_table_scan_pushdown(plan, None)?;
return self.select_to_sql_recursively(
&unparsed_table_scan,
query,
select,
relation,
);
}
let mut builder = TableRelationBuilder::default();
let mut table_parts = vec![];
if let Some(catalog_name) = scan.table_name.catalog() {
Expand Down Expand Up @@ -455,7 +468,10 @@ impl Unparser<'_> {
LogicalPlan::SubqueryAlias(plan_alias) => {
let (plan, mut columns) =
subquery_alias_inner_query_and_columns(plan_alias);

let plan = Self::unparse_table_scan_pushdown(
plan,
Some(plan_alias.alias.clone()),
)?;
if !columns.is_empty()
&& !self.dialect.supports_column_alias_in_table_alias()
{
Expand All @@ -467,7 +483,7 @@ impl Unparser<'_> {
};

// Instead of specifying column aliases as part of the outer table, inject them directly into the inner projection
let rewritten_plan = inject_column_aliases(inner_p, columns);
let rewritten_plan = inject_column_aliases(&inner_p, columns);
columns = vec![];

self.select_to_sql_recursively(
Expand All @@ -477,7 +493,7 @@ impl Unparser<'_> {
relation,
)?;
} else {
self.select_to_sql_recursively(plan, query, select, relation)?;
self.select_to_sql_recursively(&plan, query, select, relation)?;
}

relation.alias(Some(
Expand Down Expand Up @@ -532,6 +548,73 @@ impl Unparser<'_> {
}
}

fn unparse_table_scan_pushdown(
plan: &LogicalPlan,
alias: Option<TableReference>,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::TableScan(table_scan) => {
// TODO: support filters for table scan with alias. Remove this check after #12368 issue.
// see the issue: https://github.com/apache/datafusion/issues/12368
if alias.is_some() && !table_scan.filters.is_empty() {
return not_impl_err!(
"Subquery alias is not supported for table scan with pushdown filters"
);
}

let mut builder = LogicalPlanBuilder::scan(
table_scan.table_name.clone(),
Arc::clone(&table_scan.source),
None,
)?;
if let Some(project_vec) = &table_scan.projection {
let project_columns = project_vec
.iter()
.cloned()
.map(|i| {
let (qualifier, field) =
table_scan.projected_schema.qualified_field(i);
if alias.is_some() {
Column::new(alias.clone(), field.name().clone())
} else {
Column::new(qualifier.cloned(), field.name().clone())
}
})
.collect::<Vec<_>>();
if let Some(alias) = alias {
builder = builder.alias(alias)?;
}
builder = builder.project(project_columns)?;
}

let filter_expr = table_scan
.filters
.iter()
.cloned()
.reduce(|acc, expr| acc.and(expr));
if let Some(filter) = filter_expr {
builder = builder.filter(filter)?;
}

if let Some(fetch) = table_scan.fetch {
builder = builder.limit(0, Some(fetch))?;
}

builder.build()
}
LogicalPlan::SubqueryAlias(subquery_alias) => {
let new_plan = Self::unparse_table_scan_pushdown(
&subquery_alias.input,
Some(subquery_alias.alias.clone()),
)?;
LogicalPlanBuilder::from(new_plan)
.alias(subquery_alias.alias.clone())?
.build()
}
_ => Ok(plan.clone()),
}
}

fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
match expr {
Expr::Alias(Alias { expr, name, .. }) => {
Expand Down
154 changes: 151 additions & 3 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,22 @@ use std::vec;
use arrow_schema::*;
use datafusion_common::{DFSchema, Result, TableReference};
use datafusion_expr::test::function_stub::{count_udaf, max_udaf, min_udaf, sum_udaf};
use datafusion_expr::{col, table_scan};
use datafusion_expr::{col, lit, table_scan, wildcard, LogicalPlanBuilder};
use datafusion_sql::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_sql::unparser::dialect::{
DefaultDialect as UnparserDefaultDialect, Dialect as UnparserDialect,
MySqlDialect as UnparserMySqlDialect, SqliteDialect,
};
use datafusion_sql::unparser::{expr_to_sql, plan_to_sql, Unparser};

use crate::common::{MockContextProvider, MockSessionState};
use datafusion_expr::builder::{
table_scan_with_filter_and_fetch, table_scan_with_filters,
};
use datafusion_functions::core::planner::CoreFunctionPlanner;
use sqlparser::dialect::{Dialect, GenericDialect, MySqlDialect};
use sqlparser::parser::Parser;

use crate::common::{MockContextProvider, MockSessionState};

#[test]
fn roundtrip_expr() {
let tests: Vec<(TableReference, &str, &str)> = vec![
Expand Down Expand Up @@ -619,6 +621,152 @@ fn sql_round_trip(query: &str, expect: &str) {
assert_eq!(roundtrip_statement.to_string(), expect);
}

#[test]
fn test_table_scan_pushdown() -> Result<()> {
let schema = Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("age", DataType::Utf8, false),
]);

let scan_with_projection =
table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?;
let scan_with_projection = plan_to_sql(&scan_with_projection)?;
assert_eq!(
format!("{}", scan_with_projection),
"SELECT t1.id, t1.age FROM t1"
);

let scan_with_no_projection = table_scan(Some("t1"), &schema, None)?.build()?;
let scan_with_no_projection = plan_to_sql(&scan_with_no_projection)?;
assert_eq!(format!("{}", scan_with_no_projection), "SELECT * FROM t1");

let table_scan_with_projection_alias =
table_scan(Some("t1"), &schema, Some(vec![0, 1]))?
.alias("ta")?
.build()?;
let table_scan_with_projection_alias =
plan_to_sql(&table_scan_with_projection_alias)?;
assert_eq!(
format!("{}", table_scan_with_projection_alias),
"SELECT ta.id, ta.age FROM t1 AS ta"
);

let table_scan_with_no_projection_alias = table_scan(Some("t1"), &schema, None)?
.alias("ta")?
.build()?;
let table_scan_with_no_projection_alias =
plan_to_sql(&table_scan_with_no_projection_alias)?;
assert_eq!(
format!("{}", table_scan_with_no_projection_alias),
"SELECT * FROM t1 AS ta"
);

let query_from_table_scan_with_projection = LogicalPlanBuilder::from(
table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?,
)
.project(vec![wildcard()])?
.build()?;
let query_from_table_scan_with_projection =
plan_to_sql(&query_from_table_scan_with_projection)?;
assert_eq!(
format!("{}", query_from_table_scan_with_projection),
"SELECT * FROM (SELECT t1.id, t1.age FROM t1)"
);

let table_scan_with_filter = table_scan_with_filters(
Some("t1"),
&schema,
None,
vec![col("id").gt(col("age"))],
)?
.build()?;
let table_scan_with_filter = plan_to_sql(&table_scan_with_filter)?;
assert_eq!(
format!("{}", table_scan_with_filter),
"SELECT * FROM t1 WHERE (t1.id > t1.age)"
);

let table_scan_with_two_filter = table_scan_with_filters(
Some("t1"),
&schema,
None,
vec![col("id").gt(lit(1)), col("age").lt(lit(2))],
)?
.build()?;
let table_scan_with_two_filter = plan_to_sql(&table_scan_with_two_filter)?;
assert_eq!(
format!("{}", table_scan_with_two_filter),
"SELECT * FROM t1 WHERE ((t1.id > 1) AND (t1.age < 2))"
);

// TODO: support filters for table scan with alias. Enable this test after #12368 issue is fixed
// see the issue: https://github.com/apache/datafusion/issues/12368
// let table_scan_with_filter_alias = table_scan_with_filters(
// Some("t1"),
// &schema,
// None,
// vec![col("id").gt(col("age"))],
// )?.alias("ta")?.build()?;
// let table_scan_with_filter_alias = plan_to_sql(&table_scan_with_filter_alias)?;
// assert_eq!(
// format!("{}", table_scan_with_filter_alias),
// "SELECT * FROM t1 AS ta WHERE (ta.id > ta.age)"
// );

let table_scan_with_projection_and_filter = table_scan_with_filters(
Some("t1"),
&schema,
Some(vec![0, 1]),
vec![col("id").gt(col("age"))],
)?
.build()?;
let table_scan_with_projection_and_filter =
plan_to_sql(&table_scan_with_projection_and_filter)?;
assert_eq!(
format!("{}", table_scan_with_projection_and_filter),
"SELECT t1.id, t1.age FROM t1 WHERE (t1.id > t1.age)"
);

let table_scan_with_inline_fetch =
table_scan_with_filter_and_fetch(Some("t1"), &schema, None, vec![], Some(10))?
.build()?;
let table_scan_with_inline_fetch = plan_to_sql(&table_scan_with_inline_fetch)?;
assert_eq!(
format!("{}", table_scan_with_inline_fetch),
"SELECT * FROM t1 LIMIT 10"
);

let table_scan_with_projection_and_inline_fetch = table_scan_with_filter_and_fetch(
Some("t1"),
&schema,
Some(vec![0, 1]),
vec![],
Some(10),
)?
.build()?;
let table_scan_with_projection_and_inline_fetch =
plan_to_sql(&table_scan_with_projection_and_inline_fetch)?;
assert_eq!(
format!("{}", table_scan_with_projection_and_inline_fetch),
"SELECT t1.id, t1.age FROM t1 LIMIT 10"
);

let table_scan_with_all = table_scan_with_filter_and_fetch(
Some("t1"),
&schema,
Some(vec![0, 1]),
vec![col("id").gt(col("age"))],
Some(10),
)?
.build()?;
let table_scan_with_all = plan_to_sql(&table_scan_with_all)?;
assert_eq!(
format!("{}", table_scan_with_all),
"SELECT t1.id, t1.age FROM t1 WHERE (t1.id > t1.age) LIMIT 10"
);
Ok(())
}

#[test]
fn test_interval_lhs_eq() {
sql_round_trip(
Expand Down

0 comments on commit ddbdf4b

Please sign in to comment.