Skip to content

Commit

Permalink
Add not_impl_err error macro (apache#7340)
Browse files Browse the repository at this point in the history
* Add `not_impl_err` error macro

* fmt
  • Loading branch information
comphead authored Aug 21, 2023
1 parent 4275e15 commit a514b67
Show file tree
Hide file tree
Showing 61 changed files with 485 additions and 717 deletions.
9 changes: 3 additions & 6 deletions benchmarks/src/tpch/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::fs;
use std::path::{Path, PathBuf};
use std::time::Instant;

use datafusion::common::not_impl_err;
use datafusion::error::DataFusionError;
use datafusion::error::Result;
use datafusion::prelude::*;
Expand Down Expand Up @@ -117,9 +118,7 @@ impl ConvertOpt {
ctx.write_parquet(csv, output_path, Some(props)).await?
}
other => {
return Err(DataFusionError::NotImplemented(format!(
"Invalid output format: {other}"
)));
return not_impl_err!("Invalid output format: {other}");
}
}
println!("Conversion completed in {} ms", start.elapsed().as_millis());
Expand All @@ -139,9 +138,7 @@ impl ConvertOpt {
"lz0" => Compression::LZO,
"zstd" => Compression::ZSTD(Default::default()),
other => {
return Err(DataFusionError::NotImplemented(format!(
"Invalid compression format: {other}"
)));
return not_impl_err!("Invalid compression format: {other}");
}
})
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,13 @@ make_error!(plan_err, Plan);
// Exposes a macro to create `DataFusionError::Internal`
make_error!(internal_err, Internal);

// Exposes a macro to create `DataFusionError::NotImplemented`
make_error!(not_impl_err, NotImplemented);

// To avoid compiler error when using macro in the same crate:
// macros from the current crate cannot be referred to by absolute paths
pub use internal_err as _internal_err;
pub use not_impl_err as _not_impl_err;

#[cfg(test)]
mod test {
Expand Down
5 changes: 2 additions & 3 deletions datafusion/common/src/join_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
str::FromStr,
};

use crate::error::_not_impl_err;
use crate::{DataFusionError, Result};

/// Join type
Expand Down Expand Up @@ -81,9 +82,7 @@ impl FromStr for JoinType {
"RIGHTSEMI" => Ok(JoinType::RightSemi),
"LEFTANTI" => Ok(JoinType::LeftAnti),
"RIGHTANTI" => Ok(JoinType::RightAnti),
_ => Err(DataFusionError::NotImplemented(format!(
"The join type {s} does not exist or is not implemented"
))),
_ => _not_impl_err!("The join type {s} does not exist or is not implemented"),
}
}
}
Expand Down
30 changes: 14 additions & 16 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::cast::{
as_fixed_size_binary_array, as_fixed_size_list_array, as_list_array, as_struct_array,
};
use crate::delta::shift_months;
use crate::error::{DataFusionError, Result, _internal_err};
use crate::error::{DataFusionError, Result, _internal_err, _not_impl_err};
use arrow::buffer::NullBuffer;
use arrow::compute::nullif;
use arrow::datatypes::{i256, FieldRef, Fields, SchemaBuilder};
Expand Down Expand Up @@ -617,9 +617,7 @@ macro_rules! decimal_right {
-$TERM
};
($TERM:expr, /) => {
Err(DataFusionError::NotImplemented(format!(
"Decimal reciprocation not yet supported",
)))
_not_impl_err!("Decimal reciprocation not yet supported",)
};
}

Expand Down Expand Up @@ -1870,9 +1868,9 @@ impl ScalarValue {
ScalarValue::DurationNanosecond(None)
}
_ => {
return Err(DataFusionError::NotImplemented(format!(
return _not_impl_err!(
"Can't create a zero scalar from data_type \"{datatype:?}\""
)));
);
}
})
}
Expand All @@ -1892,9 +1890,9 @@ impl ScalarValue {
DataType::Float32 => ScalarValue::Float32(Some(1.0)),
DataType::Float64 => ScalarValue::Float64(Some(1.0)),
_ => {
return Err(DataFusionError::NotImplemented(format!(
return _not_impl_err!(
"Can't create an one scalar from data_type \"{datatype:?}\""
)));
);
}
})
}
Expand All @@ -1910,9 +1908,9 @@ impl ScalarValue {
DataType::Float32 => ScalarValue::Float32(Some(-1.0)),
DataType::Float64 => ScalarValue::Float64(Some(-1.0)),
_ => {
return Err(DataFusionError::NotImplemented(format!(
return _not_impl_err!(
"Can't create a negative one scalar from data_type \"{datatype:?}\""
)));
);
}
})
}
Expand All @@ -1931,9 +1929,9 @@ impl ScalarValue {
DataType::Float32 => ScalarValue::Float32(Some(10.0)),
DataType::Float64 => ScalarValue::Float64(Some(10.0)),
_ => {
return Err(DataFusionError::NotImplemented(format!(
return _not_impl_err!(
"Can't create a negative one scalar from data_type \"{datatype:?}\""
)));
);
}
})
}
Expand Down Expand Up @@ -3257,9 +3255,9 @@ impl ScalarValue {
}

other => {
return Err(DataFusionError::NotImplemented(format!(
return _not_impl_err!(
"Can't create a scalar from array of type \"{other:?}\""
)));
);
}
})
}
Expand Down Expand Up @@ -3820,9 +3818,9 @@ impl TryFrom<&DataType> for ScalarValue {
DataType::Struct(fields) => ScalarValue::Struct(None, fields.clone()),
DataType::Null => ScalarValue::Null,
_ => {
return Err(DataFusionError::NotImplemented(format!(
return _not_impl_err!(
"Can't create a scalar from data_type \"{datatype:?}\""
)));
);
}
})
}
Expand Down
10 changes: 3 additions & 7 deletions datafusion/core/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub use datafusion_sql::{ResolvedTableReference, TableReference};

use crate::catalog::schema::SchemaProvider;
use dashmap::DashMap;
use datafusion_common::{DataFusionError, Result};
use datafusion_common::{not_impl_err, DataFusionError, Result};
use std::any::Any;
use std::sync::Arc;

Expand Down Expand Up @@ -125,9 +125,7 @@ pub trait CatalogProvider: Sync + Send {
// use variables to avoid unused variable warnings
let _ = name;
let _ = schema;
Err(DataFusionError::NotImplemented(
"Registering new schemas is not supported".to_string(),
))
not_impl_err!("Registering new schemas is not supported")
}

/// Removes a schema from this catalog. Implementations of this method should return
Expand All @@ -145,9 +143,7 @@ pub trait CatalogProvider: Sync + Send {
_name: &str,
_cascade: bool,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
Err(DataFusionError::NotImplemented(
"Deregistering new schemas is not supported".to_string(),
))
not_impl_err!("Deregistering new schemas is not supported")
}
}

Expand Down
16 changes: 5 additions & 11 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::{self, datatypes::SchemaRef};
use arrow_array::RecordBatch;
use datafusion_common::DataFusionError;
use datafusion_common::{not_impl_err, DataFusionError};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;

Expand Down Expand Up @@ -267,15 +267,11 @@ impl FileFormat for CsvFormat {
conf: FileSinkConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return Err(DataFusionError::NotImplemented(
"Overwrites are not implemented yet for CSV".into(),
));
return not_impl_err!("Overwrites are not implemented yet for CSV");
}

if self.file_compression_type != FileCompressionType::UNCOMPRESSED {
return Err(DataFusionError::NotImplemented(
"Inserting compressed CSV is not implemented yet.".into(),
));
return not_impl_err!("Inserting compressed CSV is not implemented yet.");
}

let sink_schema = conf.output_schema().clone();
Expand Down Expand Up @@ -512,7 +508,7 @@ impl DataSink for CsvSink {
match self.config.writer_mode {
FileWriterMode::Append => {
if !self.config.per_thread_output {
return Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented for CsvSink in Append mode".into()));
return not_impl_err!("per_thread_output=false is not implemented for CsvSink in Append mode");
}
for file_group in &self.config.file_groups {
// In append mode, consider has_header flag only when file is empty (at the start).
Expand All @@ -538,9 +534,7 @@ impl DataSink for CsvSink {
}
}
FileWriterMode::Put => {
return Err(DataFusionError::NotImplemented(
"Put Mode is not implemented for CSV Sink yet".into(),
))
return not_impl_err!("Put Mode is not implemented for CSV Sink yet")
}
FileWriterMode::PutMultipart => {
// Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column)
Expand Down
22 changes: 6 additions & 16 deletions datafusion/core/src/datasource/file_format/file_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! File type abstraction
use crate::common::internal_err;
use crate::common::{internal_err, not_impl_err};
use crate::datasource::file_format::arrow::DEFAULT_ARROW_EXTENSION;
use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
Expand Down Expand Up @@ -144,9 +144,7 @@ impl FileCompressionType {
.boxed(),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
return not_impl_err!("Compression feature is not enabled")
}
UNCOMPRESSED => s.boxed(),
})
Expand All @@ -169,9 +167,7 @@ impl FileCompressionType {
ZSTD => Box::new(ZstdEncoder::new(w)),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
return not_impl_err!("Compression feature is not enabled")
}
UNCOMPRESSED => w,
})
Expand Down Expand Up @@ -201,9 +197,7 @@ impl FileCompressionType {
.boxed(),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
return not_impl_err!("Compression feature is not enabled")
}
UNCOMPRESSED => s.boxed(),
})
Expand All @@ -228,9 +222,7 @@ impl FileCompressionType {
},
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
return not_impl_err!("Compression feature is not enabled")
}
UNCOMPRESSED => Box::new(r),
})
Expand Down Expand Up @@ -275,9 +267,7 @@ impl FromStr for FileType {
"PARQUET" => Ok(FileType::PARQUET),
"CSV" => Ok(FileType::CSV),
"JSON" | "NDJSON" => Ok(FileType::JSON),
_ => Err(DataFusionError::NotImplemented(format!(
"Unknown FileType: {s}"
))),
_ => not_impl_err!("Unknown FileType: {s}"),
}
}
}
Expand Down
15 changes: 5 additions & 10 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::any::Any;

use bytes::Bytes;
use datafusion_common::not_impl_err;
use datafusion_common::DataFusionError;
use datafusion_execution::TaskContext;
use rand::distributions::Alphanumeric;
Expand Down Expand Up @@ -174,15 +175,11 @@ impl FileFormat for JsonFormat {
conf: FileSinkConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return Err(DataFusionError::NotImplemented(
"Overwrites are not implemented yet for Json".into(),
));
return not_impl_err!("Overwrites are not implemented yet for Json");
}

if self.file_compression_type != FileCompressionType::UNCOMPRESSED {
return Err(DataFusionError::NotImplemented(
"Inserting compressed JSON is not implemented yet.".into(),
));
return not_impl_err!("Inserting compressed JSON is not implemented yet.");
}
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(JsonSink::new(conf, self.file_compression_type));
Expand Down Expand Up @@ -281,7 +278,7 @@ impl DataSink for JsonSink {
match self.config.writer_mode {
FileWriterMode::Append => {
if !self.config.per_thread_output {
return Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented for JsonSink in Append mode".into()));
return not_impl_err!("per_thread_output=false is not implemented for JsonSink in Append mode");
}
for file_group in &self.config.file_groups {
let serializer = JsonSerializer::new();
Expand All @@ -299,9 +296,7 @@ impl DataSink for JsonSink {
}
}
FileWriterMode::Put => {
return Err(DataFusionError::NotImplemented(
"Put Mode is not implemented for Json Sink yet".into(),
))
return not_impl_err!("Put Mode is not implemented for Json Sink yet")
}
FileWriterMode::PutMultipart => {
// Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column)
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};

use datafusion_common::DataFusionError;
use datafusion_common::{not_impl_err, DataFusionError};
use datafusion_physical_expr::PhysicalExpr;

use async_trait::async_trait;
Expand Down Expand Up @@ -100,8 +100,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
_state: &SessionState,
_conf: FileSinkConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
let msg = "Writer not implemented for this format".to_owned();
Err(DataFusionError::NotImplemented(msg))
not_impl_err!("Writer not implemented for this format")
}
}

Expand Down
Loading

0 comments on commit a514b67

Please sign in to comment.