Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible hidden schema mismatch for HashJoin in ProjectionExec #13673

Open
jayzhan211 opened this issue Dec 6, 2024 · 2 comments
Open

Possible hidden schema mismatch for HashJoin in ProjectionExec #13673

jayzhan211 opened this issue Dec 6, 2024 · 2 comments
Labels
bug Something isn't working

Comments

@jayzhan211
Copy link
Contributor

jayzhan211 commented Dec 6, 2024

Describe the bug

The input schema and schema for projection are different in HashJoin cases because we might add additional alias expression for hash join. Since the schema has no additional alias column that the input schema has, it is possible to cause schema mismatch. In stats_projection(), because we ignore the error so we don't encounter such issue but I think we should not eat the error but fix the schema and column index matching issue instead.

if let Ok(data_type) = expr.data_type(&schema) {
if let Some(value) = data_type.primitive_width() {
primitive_row_size += value;
continue;
}
}

To Reproduce

Take this query for example,

statement count 0
create table t1(name varchar, id int) as values ('df', 1), ('rs', 2), ('po', 3);

statement ok
create table t2(product varchar, sid bigint) as values ('er', 1), ('sd', 2), ('ge', 3);

query TT
select t1.name, t2.product from t1 inner join t2 on t1.id = t2.sid;
----
df er
rs sd
po ge

If we add the code to ProjectExec::try_new()

        // Construct a map from the input expressions to the output expression of the Projection
        let projection_mapping = ProjectionMapping::try_new(&expr, &input_schema)?;
        let cache =
            Self::compute_properties(&input, &projection_mapping, Arc::clone(&schema))?;
        
        println!("schema: {:?}", schema);
        println!("input schema: {:?}", input_schema);
        for (e, _) in expr.iter() {
            // panic here
            let _dt = e.data_type(&schema).unwrap();
        }

        Ok(Self {
            expr,
            schema,
            input,
            metrics: ExecutionPlanMetricsSet::new(),
            cache,
        })

we can see that input_schema has additional column CAST(t1.id AS Int64)

Schema {
  fields: [
    Field {
      name: "name",
      data_type: Utf8,
      nullable: true,
      dict_id: 0,
      dict_is_ordered: false,
      metadata: {}
    },
    Field {
      name: "id",
      data_type: Int32,
      nullable: true,
      dict_id: 0,
      dict_is_ordered: false,
      metadata: {}
    },
    Field {
      name: "product",
      data_type: Utf8,
      nullable: true,
      dict_id: 0,
      dict_is_ordered: false,
      metadata: {}
    },
    Field {
      name: "sid",
      data_type: Int64,
      nullable: true,
      dict_id: 0,
      dict_is_ordered: false,
      metadata: {}
    }
  ],
  metadata: {}
}

Input Schema {
  fields: [
    Field {
      name: "name",
      data_type: Utf8,
      nullable: true,
      dict_id: 0,
      dict_is_ordered: false,
      metadata: {}
    },
    Field {
      name: "id",
      data_type: Int32,
      nullable: true,
      dict_id: 0,
      dict_is_ordered: false,
      metadata: {}
    },
    Field {
      name: "CAST(t1.id AS Int64)",
      data_type: Int64,
      nullable: true,
      dict_id: 0,
      dict_is_ordered: false,
      metadata: {}
    },
    Field {
      name: "product",
      data_type: Utf8,
      nullable: true,
      dict_id: 0,
      dict_is_ordered: false,
      metadata: {}
    },
    Field {
      name: "sid",
      data_type: Int64,
      nullable: true,
      dict_id: 0,
      dict_is_ordered: false,
      metadata: {}
    }
  ],
  metadata: {}
}

If we use the schema for projection, we have the column mismatch index that could cause Column's bounds_check.

The error is because of the out of bound index. sid is created as index 4 for input_schema but the schema we used in projection has only 4 columns.

thread 'tokio-runtime-worker' panicked at datafusion/physical-plan/src/projection.rs:102:44:
called `Result::unwrap()` on an `Err` value: Internal("PhysicalExpr Column references column 'sid' at index 4 (zero-based) but input schema only has 4 columns: [\"name\", \"id\", \"product\", \"sid\"]")

Expected behavior

I think we might either recreate expressions with the new index that match the schema or use the input_schema for projection. I'm not sure which is the right choice yet.

Additional context

No response

@jayzhan211 jayzhan211 added the bug Something isn't working label Dec 6, 2024
@jayzhan211
Copy link
Contributor Author

@berkaysynnada Do you remember the rationale of ignoring error in stats_projection?

@berkaysynnada
Copy link
Contributor

I think it was written like that there is no possibility of that any projection expression doesn't exist in projection schema

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants