From 18c173b38ba1a1c0ab97675dc28e542b2b642577 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Mar 2025 14:28:37 +0530 Subject: [PATCH 1/2] test: `is_schema_mismatch` --- src/event/format/mod.rs | 183 +++++++++++++++++++++++++++++++++++----- 1 file changed, 160 insertions(+), 23 deletions(-) diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 58c35fc79..eb5c6f6cf 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -173,7 +173,7 @@ pub trait EventFormat: Sized { // prepare the record batch and new fields to be added let mut new_schema = Arc::new(Schema::new(schema)); - if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) { + if !is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) { return Err(anyhow!("Schema mismatch")); } new_schema = @@ -190,28 +190,6 @@ pub trait EventFormat: Sized { Ok((rb, is_first)) } - fn is_schema_matching( - new_schema: Arc, - storage_schema: &HashMap>, - static_schema_flag: bool, - ) -> bool { - if !static_schema_flag { - return true; - } - for field in new_schema.fields() { - let Some(storage_field) = storage_schema.get(field.name()) else { - return false; - }; - if field.name() != storage_field.name() { - return false; - } - if field.data_type() != storage_field.data_type() { - return false; - } - } - true - } - #[allow(clippy::too_many_arguments)] fn into_event( self, @@ -226,6 +204,28 @@ pub trait EventFormat: Sized { ) -> Result; } +fn is_schema_matching( + new_schema: Arc, + storage_schema: &HashMap>, + static_schema_flag: bool, +) -> bool { + if !static_schema_flag { + return true; + } + for field in new_schema.fields() { + let Some(storage_field) = storage_schema.get(field.name()) else { + return false; + }; + if field.name() != storage_field.name() { + return false; + } + if field.data_type() != storage_field.data_type() { + return false; + } + } + true +} + pub fn get_existing_field_names( inferred_schema: Arc, existing_schema: Option<&HashMap>>, @@ -369,3 +369,140 @@ pub fn override_data_type( Arc::new(Schema::new(updated_schema)) } + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, sync::Arc}; + + use arrow_schema::{DataType, Field, Schema}; + + use super::*; + + // Helper function to create a test field + fn create_field(name: &str, data_type: DataType) -> Arc { + Arc::new(Field::new(name.to_string(), data_type, true)) + } + + // Helper function to create a test schema + fn create_schema(fields: Vec>) -> Arc { + Arc::new(Schema::new(fields)) + } + + // Helper function to create a storage schema HashMap + fn create_storage_schema(fields: Vec>) -> HashMap> { + let mut storage_schema = HashMap::new(); + for field in fields { + storage_schema.insert(field.name().to_string(), field.clone()); + } + storage_schema + } + + #[test] + fn test_static_schema_flag_false() { + // When static_schema_flag is false, should always return true regardless of schemas + let field1 = create_field("id", DataType::Int32); + let field2 = create_field("name", DataType::Utf8); + + let schema = create_schema(vec![field1.clone(), field2.clone()]); + let storage_schema = create_storage_schema(vec![field1.clone()]); // Missing field2 + + // Even though schemas don't match, function should return true because static_schema_flag is false + assert!(is_schema_matching(schema, &storage_schema, false)); + } + + #[test] + fn test_identical_schemas() { + // When schemas are identical, should return true + let field1 = create_field("id", DataType::Int32); + let field2 = create_field("name", DataType::Utf8); + + let schema = create_schema(vec![field1.clone(), field2.clone()]); + let storage_schema = create_storage_schema(vec![field1.clone(), field2.clone()]); + + assert!(is_schema_matching(schema, &storage_schema, true)); + } + + #[test] + fn test_missing_field_in_storage() { + // When storage schema is missing a field from new schema, should return false + let field1 = create_field("id", DataType::Int32); + let field2 = create_field("name", DataType::Utf8); + + let schema = create_schema(vec![field1.clone(), field2.clone()]); + let storage_schema = create_storage_schema(vec![field1.clone()]); // Missing field2 + + assert!(!is_schema_matching(schema, &storage_schema, true)); + } + + #[test] + fn test_different_data_type() { + // When field has different data type, should return false + let field1 = create_field("id", DataType::Int32); + // Same name but different type + let field1_different_type = create_field("id", DataType::Int64); + let field2 = create_field("name", DataType::Utf8); + + let schema = create_schema(vec![field1.clone(), field2.clone()]); + let storage_schema = create_storage_schema(vec![field1_different_type, field2.clone()]); + + assert!(!is_schema_matching(schema, &storage_schema, true)); + } + + #[test] + fn test_extra_fields_in_storage() { + // When storage schema has extra fields not in new schema, should still return true + // This is because we only check if fields in new_schema exist in storage_schema + let field1 = create_field("id", DataType::Int32); + let field2 = create_field("name", DataType::Utf8); + let extra_field = create_field("extra", DataType::Boolean); + + let schema = create_schema(vec![field1.clone(), field2.clone()]); + let storage_schema = + create_storage_schema(vec![field1.clone(), field2.clone(), extra_field]); + + assert!(is_schema_matching(schema, &storage_schema, true)); + } + + #[test] + fn test_empty_new_schema() { + // When new schema is empty, should return true + let field1 = create_field("id", DataType::Int32); + + let empty_schema = create_schema(vec![]); + let storage_schema = create_storage_schema(vec![field1.clone()]); + + assert!(is_schema_matching( + empty_schema, + &storage_schema, + true + )); + } + + #[test] + fn test_empty_storage_schema() { + // When storage schema is empty but new schema has fields, should return false + let field1 = create_field("id", DataType::Int32); + + let schema = create_schema(vec![field1.clone()]); + let empty_storage_schema: HashMap> = HashMap::new(); + + assert!(!is_schema_matching( + schema, + &empty_storage_schema, + true + )); + } + + #[test] + fn test_both_empty_schemas() { + // When both schemas are empty, should return true + let empty_schema = create_schema(vec![]); + let empty_storage_schema: HashMap> = HashMap::new(); + + assert!(is_schema_matching( + empty_schema, + &empty_storage_schema, + true + )); + } +} From 35ad7459af4176f7d78e7fbeea8778261be58209 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Mar 2025 14:54:47 +0530 Subject: [PATCH 2/2] refactor: schema mismatch check --- src/event/format/mod.rs | 113 +++++++++++++++++----------------------- 1 file changed, 47 insertions(+), 66 deletions(-) diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index eb5c6f6cf..c17bd0f38 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -171,13 +171,18 @@ pub trait EventFormat: Sized { )), ); - // prepare the record batch and new fields to be added - let mut new_schema = Arc::new(Schema::new(schema)); - if !is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) { + if !is_schema_matching(&schema, storage_schema, static_schema_flag) { return Err(anyhow!("Schema mismatch")); } - new_schema = - update_field_type_in_schema(new_schema, None, time_partition, None, schema_version); + + // prepare the record batch and new fields to be added + let new_schema = update_field_type_in_schema( + Arc::new(Schema::new(schema)), + None, + time_partition, + None, + schema_version, + ); let mut rb = Self::decode(data, new_schema.clone())?; rb = replace_columns( @@ -204,26 +209,22 @@ pub trait EventFormat: Sized { ) -> Result; } +/// Determines if a schema matches the storage schema based on configuration. +/// Returns `true` if the schemas match according to the rules: +/// - If `static_schema_flag` is `false`, always returns `true` (flexible schema mode) +/// - If `static_schema_flag` is `true`, returns `true` only if all fields in `schema` +/// exist in `storage_schema` with exactly matching properties fn is_schema_matching( - new_schema: Arc, + schema: &[Arc], storage_schema: &HashMap>, static_schema_flag: bool, ) -> bool { - if !static_schema_flag { - return true; - } - for field in new_schema.fields() { - let Some(storage_field) = storage_schema.get(field.name()) else { - return false; - }; - if field.name() != storage_field.name() { - return false; - } - if field.data_type() != storage_field.data_type() { - return false; - } - } - true + !static_schema_flag + || !schema.iter().any(|field| { + storage_schema + .get(field.name()) + .is_none_or(|storage_field| storage_field != field) + }) } pub fn get_existing_field_names( @@ -374,7 +375,7 @@ pub fn override_data_type( mod tests { use std::{collections::HashMap, sync::Arc}; - use arrow_schema::{DataType, Field, Schema}; + use arrow_schema::{DataType, Field}; use super::*; @@ -383,13 +384,8 @@ mod tests { Arc::new(Field::new(name.to_string(), data_type, true)) } - // Helper function to create a test schema - fn create_schema(fields: Vec>) -> Arc { - Arc::new(Schema::new(fields)) - } - // Helper function to create a storage schema HashMap - fn create_storage_schema(fields: Vec>) -> HashMap> { + fn create_storage_schema(fields: &[Arc]) -> HashMap> { let mut storage_schema = HashMap::new(); for field in fields { storage_schema.insert(field.name().to_string(), field.clone()); @@ -403,11 +399,11 @@ mod tests { let field1 = create_field("id", DataType::Int32); let field2 = create_field("name", DataType::Utf8); - let schema = create_schema(vec![field1.clone(), field2.clone()]); - let storage_schema = create_storage_schema(vec![field1.clone()]); // Missing field2 + let schema = [field1.clone(), field2.clone()]; + let storage_schema = create_storage_schema(&[field1.clone()]); // Missing field2 // Even though schemas don't match, function should return true because static_schema_flag is false - assert!(is_schema_matching(schema, &storage_schema, false)); + assert!(is_schema_matching(&schema, &storage_schema, false)); } #[test] @@ -416,10 +412,10 @@ mod tests { let field1 = create_field("id", DataType::Int32); let field2 = create_field("name", DataType::Utf8); - let schema = create_schema(vec![field1.clone(), field2.clone()]); - let storage_schema = create_storage_schema(vec![field1.clone(), field2.clone()]); + let schema = [field1.clone(), field2.clone()]; + let storage_schema = create_storage_schema(&schema); - assert!(is_schema_matching(schema, &storage_schema, true)); + assert!(is_schema_matching(&schema, &storage_schema, true)); } #[test] @@ -428,10 +424,10 @@ mod tests { let field1 = create_field("id", DataType::Int32); let field2 = create_field("name", DataType::Utf8); - let schema = create_schema(vec![field1.clone(), field2.clone()]); - let storage_schema = create_storage_schema(vec![field1.clone()]); // Missing field2 + let schema = [field1.clone(), field2.clone()]; + let storage_schema = create_storage_schema(&[field1.clone()]); // Missing field2 - assert!(!is_schema_matching(schema, &storage_schema, true)); + assert!(!is_schema_matching(&schema, &storage_schema, true)); } #[test] @@ -442,10 +438,10 @@ mod tests { let field1_different_type = create_field("id", DataType::Int64); let field2 = create_field("name", DataType::Utf8); - let schema = create_schema(vec![field1.clone(), field2.clone()]); - let storage_schema = create_storage_schema(vec![field1_different_type, field2.clone()]); + let schema = [field1.clone(), field2.clone()]; + let storage_schema = create_storage_schema(&[field1_different_type, field2.clone()]); - assert!(!is_schema_matching(schema, &storage_schema, true)); + assert!(!is_schema_matching(&schema, &storage_schema, true)); } #[test] @@ -456,11 +452,10 @@ mod tests { let field2 = create_field("name", DataType::Utf8); let extra_field = create_field("extra", DataType::Boolean); - let schema = create_schema(vec![field1.clone(), field2.clone()]); - let storage_schema = - create_storage_schema(vec![field1.clone(), field2.clone(), extra_field]); + let schema = [field1.clone(), field2.clone()]; + let storage_schema = create_storage_schema(&[field1.clone(), field2.clone(), extra_field]); - assert!(is_schema_matching(schema, &storage_schema, true)); + assert!(is_schema_matching(&schema, &storage_schema, true)); } #[test] @@ -468,14 +463,9 @@ mod tests { // When new schema is empty, should return true let field1 = create_field("id", DataType::Int32); - let empty_schema = create_schema(vec![]); - let storage_schema = create_storage_schema(vec![field1.clone()]); + let storage_schema = create_storage_schema(&[field1.clone()]); - assert!(is_schema_matching( - empty_schema, - &storage_schema, - true - )); + assert!(is_schema_matching(&[], &storage_schema, true)); } #[test] @@ -483,26 +473,17 @@ mod tests { // When storage schema is empty but new schema has fields, should return false let field1 = create_field("id", DataType::Int32); - let schema = create_schema(vec![field1.clone()]); - let empty_storage_schema: HashMap> = HashMap::new(); + let schema = [field1.clone()]; + let empty_storage_schema = HashMap::new(); - assert!(!is_schema_matching( - schema, - &empty_storage_schema, - true - )); + assert!(!is_schema_matching(&schema, &empty_storage_schema, true)); } #[test] fn test_both_empty_schemas() { // When both schemas are empty, should return true - let empty_schema = create_schema(vec![]); - let empty_storage_schema: HashMap> = HashMap::new(); - - assert!(is_schema_matching( - empty_schema, - &empty_storage_schema, - true - )); + let empty_storage_schema = HashMap::new(); + + assert!(is_schema_matching(&[], &empty_storage_schema, true)); } }