Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
Signed-off-by: Veeupup <[email protected]>
  • Loading branch information
Veeupup committed May 6, 2022
1 parent c76c854 commit 53daa6d
Show file tree
Hide file tree
Showing 23 changed files with 60 additions and 110 deletions.
10 changes: 2 additions & 8 deletions common/datavalues/src/types/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,10 @@ pub trait DataType: std::fmt::Debug + Sync + Send + DynClone {
}

fn create_mutable(&self, capacity: usize) -> Box<dyn MutableColumn>;

fn create_serializer(&self) -> TypeSerializerImpl;
/// work only for timestamp serializer
fn create_serializer_with_tz(&self, _tz: Tz) -> TypeSerializerImpl {
unimplemented!()
}

fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl;
/// work only for timestamp deserializer
fn create_deserializer_with_tz(&self, _capacity: usize, _tz: Tz) -> TypeDeserializerImpl {
unimplemented!()
}
}

pub fn from_arrow_type(dt: &ArrowType) -> DataTypeImpl {
Expand Down
15 changes: 0 additions & 15 deletions common/datavalues/src/types/type_nullable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,6 @@ impl DataType for NullableType {
.into()
}

fn create_serializer_with_tz(&self, tz: Tz) -> TypeSerializerImpl {
NullableSerializer {
inner: Box::new(self.inner.create_serializer_with_tz(tz)),
}
.into()
}

fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl {
NullableDeserializer {
inner: Box::new(self.inner.create_deserializer(capacity)),
Expand All @@ -103,14 +96,6 @@ impl DataType for NullableType {
.into()
}

fn create_deserializer_with_tz(&self, capacity: usize, tz: Tz) -> TypeDeserializerImpl {
NullableDeserializer {
inner: Box::new(self.inner.create_deserializer_with_tz(capacity, tz)),
bitmap: MutableBitmap::with_capacity(capacity),
}
.into()
}

fn create_mutable(&self, capacity: usize) -> Box<dyn MutableColumn> {
Box::new(MutableNullableColumn::new(
self.inner.create_mutable(capacity),
Expand Down
13 changes: 0 additions & 13 deletions common/datavalues/src/types/type_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,6 @@ impl DataType for TimestampType {
TimestampSerializer::default().into()
}

fn create_serializer_with_tz(&self, tz: Tz) -> TypeSerializerImpl {
TimestampSerializer::new_with_tz(tz).into()
}

fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl {
let tz = "UTC".parse::<Tz>().unwrap();
TimestampDeserializer {
Expand All @@ -164,15 +160,6 @@ impl DataType for TimestampType {
.into()
}

fn create_deserializer_with_tz(&self, capacity: usize, tz: Tz) -> TypeDeserializerImpl {
TimestampDeserializer {
builder: MutablePrimitiveColumn::<i64>::with_capacity(capacity),
tz,
precision: self.precision,
}
.into()
}

fn create_mutable(&self, capacity: usize) -> Box<dyn MutableColumn> {
Box::new(MutablePrimitiveColumn::<i64>::with_capacity(capacity))
}
Expand Down
5 changes: 1 addition & 4 deletions common/functions/src/scalars/dates/interval_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,7 @@ where
_input_rows: usize,
) -> Result<ColumnRef> {
// Todo(zhyass): define the ctx out of the eval.
let tz = func_ctx.tz.parse::<Tz>().map_err(|_| {
ErrorCode::InvalidTimezone("Timezone has been checked and should be valid")
})?;
let mut ctx = EvalContext::new(self.factor, self.precision, None, tz);
let mut ctx = EvalContext::new(self.factor, self.precision, None, func_ctx.tz.clone());
let col = scalar_binary_op(
columns[0].column(),
columns[1].column(),
Expand Down
4 changes: 1 addition & 3 deletions common/functions/src/scalars/dates/number_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,7 @@ where
// round_func need to calcute it with origin timezone
// such as in UTC: 2022-03-31 22:00 and in +8:00 time is 2022-04-01 6:00
// then the result of to the month of should be 2022-04-01 6:00 rather than 2022-03-01 22:00
let tz = func_ctx.tz.parse::<Tz>().map_err(|_| {
ErrorCode::InvalidTimezone("Timezone has been checked and should be valid")
})?;
let tz = func_ctx.tz;
let func = |v: i64, _ctx: &mut EvalContext| {
let date_time = tz.timestamp(v / 1_000_000, 0_u32);
T::to_number(date_time, &tz)
Expand Down
5 changes: 1 addition & 4 deletions common/functions/src/scalars/dates/round_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,7 @@ impl Function for RoundFunction {
) -> Result<common_datavalues::ColumnRef> {
let func = |val: i64, ctx: &mut EvalContext| self.execute(val, &ctx.tz);
let mut eval_context = EvalContext::default();
let tz = func_ctx.tz.parse::<Tz>().map_err(|_| {
ErrorCode::InvalidTimezone("Timezone has been checked and should be valid")
})?;
eval_context.tz = tz;
eval_context.tz = func_ctx.tz.clone();
let col = scalar_unary_op::<i64, _, _>(columns[0].column(), func, &mut eval_context)?;
for micros in col.iter() {
let _ = check_timestamp(*micros)?;
Expand Down
4 changes: 1 addition & 3 deletions common/functions/src/scalars/dates/week_date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,7 @@ where
mode = week_mode;
}

let tz = func_ctx.tz.parse::<Tz>().map_err(|_| {
ErrorCode::InvalidTimezone("Timezone has been checked and should be valid")
})?;
let tz = func_ctx.tz;

match columns[0].data_type().data_type_id() {
TypeID::Date => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ pub fn cast_from_timestamp(
match data_type.data_type_id() {
TypeID::String => {
let mut builder = MutableStringColumn::with_capacity(size);
let tz = func_ctx.tz.parse::<Tz>().map_err(|_| {
ErrorCode::InvalidTimezone("Timezone has been checked and should be valid")
})?;
let tz = func_ctx.tz;
for v in c.iter() {
let s = timestamp_to_string(
tz.timestamp(*v / 1_000_000, (*v % 1_000_000 * 1_000) as u32),
Expand Down
13 changes: 8 additions & 5 deletions common/functions/src/scalars/expressions/cast_from_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// use chrono_tz::Tz;
use chrono_tz::Tz;
use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::arrow::temporal_conversions::EPOCH_DAYS_FROM_CE;
use common_datavalues::chrono::DateTime;
use common_datavalues::chrono::Datelike;
use common_datavalues::chrono::TimeZone;
use common_datavalues::chrono::NaiveDate;
use common_datavalues::chrono::NaiveDateTime;
use common_datavalues::prelude::*;
use common_exception::Result;
use common_exception::ErrorCode;

use super::cast_with_type::arrow_cast_compute;
use super::cast_with_type::new_mutable_bitmap;
Expand Down Expand Up @@ -55,9 +58,9 @@ pub fn cast_from_string(

TypeID::Timestamp => {
let mut builder = ColumnBuilder::<i64>::with_capacity(size);

let tz = func_ctx.tz;
for (row, v) in str_column.iter().enumerate() {
match string_to_timestamp(v) {
match string_to_timestamp(v, &tz) {
Some(d) => {
builder.append(d.timestamp_micros());
}
Expand Down Expand Up @@ -86,9 +89,9 @@ pub fn cast_from_string(

// TODO support timezone
#[inline]
pub fn string_to_timestamp(date_str: impl AsRef<[u8]>) -> Option<NaiveDateTime> {
pub fn string_to_timestamp(date_str: impl AsRef<[u8]>, tz: &Tz) -> Option<DateTime<Tz>> {
let s = std::str::from_utf8(date_str.as_ref()).ok();
s.and_then(|c| NaiveDateTime::parse_from_str(c, "%Y-%m-%d %H:%M:%S%.9f").ok())
s.and_then(|c| tz.datetime_from_str(c, "%Y-%m-%d %H:%M:%S%.f").ok())
}

#[inline]
Expand Down
9 changes: 6 additions & 3 deletions common/functions/src/scalars/expressions/cast_from_variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono_tz::Tz;
use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::arrow::temporal_conversions::EPOCH_DAYS_FROM_CE;
use common_datavalues::chrono::Datelike;
use common_datavalues::prelude::*;
use common_datavalues::with_match_primitive_type_id;
use common_exception::ErrorCode;
use common_exception::Result;
use serde_json::Value as JsonValue;

use serde_json::Value as JsonValue;
use crate::scalars::FunctionContext;
use super::cast_from_string::string_to_date;
use super::cast_from_string::string_to_timestamp;
use super::cast_with_type::new_mutable_bitmap;

pub fn cast_from_variant(
column: &ColumnRef,
data_type: &DataTypeImpl,
func_ctx: &FunctionContext,
) -> Result<(ColumnRef, Option<Bitmap>)> {
let column = Series::remove_nullable(column);
let json_column: &VariantColumn = if column.is_const() {
Expand Down Expand Up @@ -134,12 +137,12 @@ pub fn cast_from_variant(
TypeID::Timestamp => {
// TODO(veeupup): support datetime with precision
let mut builder = ColumnBuilder::<i64>::with_capacity(size);

let tz = func_ctx.tz;
for (row, value) in json_column.iter().enumerate() {
match value.as_ref() {
JsonValue::Null => bitmap.set(row, false),
JsonValue::String(v) => {
if let Some(d) = string_to_timestamp(v) {
if let Some(d) = string_to_timestamp(v, &tz) {
builder.append(d.timestamp_micros());
} else {
bitmap.set(row, false);
Expand Down
11 changes: 2 additions & 9 deletions common/functions/src/scalars/expressions/cast_with_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pub fn cast_with_type(
func_ctx,
),
TypeID::Variant | TypeID::VariantArray | TypeID::VariantObject => {
cast_from_variant(column, &nonull_data_type)
cast_from_variant(column, &nonull_data_type, func_ctx)
}
_ => arrow_cast_compute(
column,
Expand Down Expand Up @@ -228,14 +228,7 @@ pub fn cast_to_variant(
}
let mut builder = ColumnBuilder::<VariantValue>::with_capacity(size);
if from_type.data_type_id().is_numeric() || from_type.data_type_id() == TypeID::Boolean {
let serializer = if from_type.data_type_id() == TypeID::Timestamp {
let tz = func_ctx.tz.parse::<Tz>().map_err(|_| {
ErrorCode::InvalidTimezone("Timezone has been checked and should be valid")
})?;
from_type.create_serializer_with_tz(tz)
} else {
from_type.create_serializer()
};
let serializer = from_type.create_serializer();
let format = FormatSettings::default();
match serializer.serialize_json_object(&column, None, &format) {
Ok(values) => {
Expand Down
5 changes: 3 additions & 2 deletions common/functions/src/scalars/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::fmt;

use chrono_tz::Tz;
use common_datavalues::ColumnRef;
use common_datavalues::ColumnsWithField;
use common_datavalues::DataTypeImpl;
Expand All @@ -25,13 +26,13 @@ use super::Monotonicity;
/// for now, this is only store Timezone
#[derive(Clone)]
pub struct FunctionContext {
pub tz: String,
pub tz: Tz,
}

impl Default for FunctionContext {
fn default() -> Self {
Self {
tz: "UTC".to_string(),
tz: "UTC".parse::<Tz>().unwrap(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/io/src/format_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl Default for FormatSettings {
empty_as_default: false,
skip_header: false,
compression: Compression::None,
timezone: vec![b'U', b'T', b'C'],
timezone: "UTC".as_bytes().to_vec(),
}
}
}
Expand Down
9 changes: 1 addition & 8 deletions common/streams/src/sources/source_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,7 @@ where R: AsyncRead + Unpin + Send
.fields()
.iter()
.map(|f| {
if f.data_type().data_type_id() == TypeID::Timestamp {
f.data_type().create_deserializer_with_tz(
self.builder.block_size,
self.builder.tz.clone(),
)
} else {
f.data_type().create_deserializer(self.builder.block_size)
}
f.data_type().create_deserializer(self.builder.block_size)
})
.collect::<Vec<_>>();

Expand Down
9 changes: 1 addition & 8 deletions common/streams/src/sources/source_ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,7 @@ where R: AsyncBufRead + Unpin + Send
.fields()
.iter()
.map(|f| {
if f.data_type().data_type_id() == TypeID::Timestamp {
f.data_type().create_deserializer_with_tz(
self.builder.block_size,
self.builder.tz.clone(),
)
} else {
f.data_type().create_deserializer(self.builder.block_size)
}
f.data_type().create_deserializer(self.builder.block_size)
})
.collect::<Vec<_>>();

Expand Down
4 changes: 4 additions & 0 deletions query/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::VecDeque;
use std::sync::Arc;

use chrono_tz::Tz;
use common_datavalues::DataType;
use common_exception::ErrorCode;
use common_exception::Result;
Expand Down Expand Up @@ -120,6 +121,9 @@ impl InsertInterpreter {
let tz = String::from_utf8(tz).map_err(|_| {
ErrorCode::LogicalError("Timezone has been checked and should be valid.")
})?;
let tz = tz.parse::<Tz>().map_err(|_| {
ErrorCode::InvalidTimezone("Timezone has been checked and should be valid")
})?;
let func_ctx = FunctionContext { tz };
pipeline.add_transform(|transform_input_port, transform_output_port| {
TransformCastSchema::try_create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;
use std::sync::Arc;
use chrono_tz::Tz;

use common_datablocks::DataBlock;
use common_datavalues::prelude::*;
Expand Down Expand Up @@ -197,6 +198,9 @@ impl ExpressionExecutor {
let tz = String::from_utf8(tz).map_err(|_| {
ErrorCode::LogicalError("Timezone has beeen checked and should be valid.")
})?;
let tz = tz.parse::<Tz>().map_err(|_| {
ErrorCode::InvalidTimezone("Timezone has been checked and should be valid")
})?;
let func_ctx = FunctionContext { tz };
let column = f.func.eval(func_ctx, &arg_columns, rows)?;
Ok(ColumnWithField::new(
Expand Down
5 changes: 4 additions & 1 deletion query/src/pipelines/transforms/transform_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::any::Any;
use std::sync::Arc;

use chrono_tz::Tz;
use common_datavalues::DataSchemaRef;
use common_datavalues::DataType;
use common_exception::ErrorCode;
Expand Down Expand Up @@ -100,6 +100,9 @@ impl Processor for SinkTransform {
let tz = String::from_utf8(tz).map_err(|_| {
ErrorCode::LogicalError("Timezone has beeen checked and should be valid.")
})?;
let tz = tz.parse::<Tz>().map_err(|_| {
ErrorCode::InvalidTimezone("Timezone has been checked and should be valid")
})?;
let func_ctx = FunctionContext { tz };
input_stream = Box::pin(CastStream::try_create(
input_stream,
Expand Down
11 changes: 1 addition & 10 deletions query/src/servers/clickhouse/writers/query_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,7 @@ pub fn to_clickhouse_block(block: DataBlock, format: &FormatSettings) -> Result<
let column = block.column(column_index);
let field = block.schema().field(column_index);
let name = field.name();
let serializer = if field.data_type().data_type_id() == TypeID::Timestamp {
let tz = String::from_utf8(format.timezone.clone())
.map_err(|_| ErrorCode::LogicalError("timezone must be set"))?;
let tz = tz.parse::<Tz>().map_err(|_| {
ErrorCode::InvalidTimezone("Timezone has been checked and should be valid")
})?;
field.data_type().create_serializer_with_tz(tz)
} else {
field.data_type().create_serializer()
};
let serializer = field.data_type().create_serializer();
result.append_column(column::new_column(
name,
serializer.serialize_clickhouse_format(&column.convert_full_column(), format)?,
Expand Down
Loading

0 comments on commit 53daa6d

Please sign in to comment.