Skip to content

Commit

Permalink
[BugFix] Fix timestamp version2 (StarRocks#20868)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirtysalt authored Apr 3, 2023
1 parent 826f028 commit 17ad317
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 91 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified

int func_version = request.common().__isset.func_version
? request.common().func_version
: TFunctionVersion::type::FUNC_VERSION_UNIX_TIMESTAMP_INT64;
: TFunctionVersion::type::RUNTIME_FILTER_SERIALIZE_VERSION_2;
runtime_state->set_func_version(func_version);
runtime_state->init_mem_trackers(query_mem_tracker);
runtime_state->set_be_number(request.backend_num());
Expand Down
103 changes: 50 additions & 53 deletions be/src/exprs/time_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -992,13 +992,12 @@ StatusOr<ColumnPtr> TimeFunctions::_t_to_unix_from_datetime(FunctionContext* con
return result.build(ColumnHelper::is_all_const(columns));
}

StatusOr<ColumnPtr> TimeFunctions::to_unix_from_datetime(FunctionContext* context, const Columns& columns) {
int func_version = context->state()->func_version();
if (func_version >= TFunctionVersion::type::FUNC_VERSION_UNIX_TIMESTAMP_INT64) {
return _t_to_unix_from_datetime<TYPE_BIGINT>(context, columns);
} else {
return _t_to_unix_from_datetime<TYPE_INT>(context, columns);
}
StatusOr<ColumnPtr> TimeFunctions::to_unix_from_datetime_64(FunctionContext* context, const Columns& columns) {
return _t_to_unix_from_datetime<TYPE_BIGINT>(context, columns);
}

StatusOr<ColumnPtr> TimeFunctions::to_unix_from_datetime_32(FunctionContext* context, const Columns& columns) {
return _t_to_unix_from_datetime<TYPE_INT>(context, columns);
}

/*
Expand Down Expand Up @@ -1037,13 +1036,11 @@ StatusOr<ColumnPtr> TimeFunctions::_t_to_unix_from_date(FunctionContext* context
return result.build(ColumnHelper::is_all_const(columns));
}

StatusOr<ColumnPtr> TimeFunctions::to_unix_from_date(FunctionContext* context, const Columns& columns) {
int func_version = context->state()->func_version();
if (func_version >= TFunctionVersion::type::FUNC_VERSION_UNIX_TIMESTAMP_INT64) {
return _t_to_unix_from_date<TYPE_BIGINT>(context, columns);
} else {
return _t_to_unix_from_date<TYPE_INT>(context, columns);
}
StatusOr<ColumnPtr> TimeFunctions::to_unix_from_date_64(FunctionContext* context, const Columns& columns) {
return _t_to_unix_from_date<TYPE_BIGINT>(context, columns);
}
StatusOr<ColumnPtr> TimeFunctions::to_unix_from_date_32(FunctionContext* context, const Columns& columns) {
return _t_to_unix_from_date<TYPE_INT>(context, columns);
}

template <LogicalType TIMESTAMP_TYPE>
Expand Down Expand Up @@ -1088,28 +1085,30 @@ StatusOr<ColumnPtr> TimeFunctions::_t_to_unix_from_datetime_with_format(Function
return result.build(ColumnHelper::is_all_const(columns));
}

StatusOr<ColumnPtr> TimeFunctions::to_unix_from_datetime_with_format(FunctionContext* context, const Columns& columns) {
int func_version = context->state()->func_version();
if (func_version >= TFunctionVersion::type::FUNC_VERSION_UNIX_TIMESTAMP_INT64) {
return _t_to_unix_from_datetime_with_format<TYPE_BIGINT>(context, columns);
} else {
return _t_to_unix_from_datetime_with_format<TYPE_INT>(context, columns);
}
StatusOr<ColumnPtr> TimeFunctions::to_unix_from_datetime_with_format_64(FunctionContext* context,
const Columns& columns) {
return _t_to_unix_from_datetime_with_format<TYPE_BIGINT>(context, columns);
}

StatusOr<ColumnPtr> TimeFunctions::to_unix_from_datetime_with_format_32(FunctionContext* context,
const Columns& columns) {
return _t_to_unix_from_datetime_with_format<TYPE_INT>(context, columns);
}

StatusOr<ColumnPtr> TimeFunctions::to_unix_for_now(FunctionContext* context, const Columns& columns) {
int func_version = context->state()->func_version();
StatusOr<ColumnPtr> TimeFunctions::to_unix_for_now_64(FunctionContext* context, const Columns& columns) {
DCHECK_EQ(columns.size(), 0);
int64_t value = context->state()->timestamp_ms() / 1000;
if (func_version >= TFunctionVersion::type::FUNC_VERSION_UNIX_TIMESTAMP_INT64) {
auto result = Int64Column::create();
result->append(value);
return ConstColumn::create(result, 1);
} else {
auto result = Int32Column::create();
result->append(value);
return ConstColumn::create(result, 1);
}
auto result = Int64Column::create();
result->append(value);
return ConstColumn::create(result, 1);
}

StatusOr<ColumnPtr> TimeFunctions::to_unix_for_now_32(FunctionContext* context, const Columns& columns) {
DCHECK_EQ(columns.size(), 0);
int64_t value = context->state()->timestamp_ms() / 1000;
auto result = Int32Column::create();
result->append(value);
return ConstColumn::create(result, 1);
}

/*
Expand Down Expand Up @@ -1154,13 +1153,12 @@ StatusOr<ColumnPtr> TimeFunctions::_t_from_unix_to_datetime(FunctionContext* con
return result.build(ColumnHelper::is_all_const(columns));
}

StatusOr<ColumnPtr> TimeFunctions::from_unix_to_datetime(FunctionContext* context, const Columns& columns) {
int func_version = context->state()->func_version();
if (func_version >= TFunctionVersion::type::FUNC_VERSION_UNIX_TIMESTAMP_INT64) {
return _t_from_unix_to_datetime<TYPE_BIGINT>(context, columns);
} else {
return _t_from_unix_to_datetime<TYPE_INT>(context, columns);
}
StatusOr<ColumnPtr> TimeFunctions::from_unix_to_datetime_64(FunctionContext* context, const Columns& columns) {
return _t_from_unix_to_datetime<TYPE_BIGINT>(context, columns);
}

StatusOr<ColumnPtr> TimeFunctions::from_unix_to_datetime_32(FunctionContext* context, const Columns& columns) {
return _t_from_unix_to_datetime<TYPE_INT>(context, columns);
}

std::string TimeFunctions::convert_format(const Slice& format) {
Expand Down Expand Up @@ -1310,26 +1308,25 @@ StatusOr<ColumnPtr> TimeFunctions::_t_from_unix_with_format_const(std::string& f
return result.build(ColumnHelper::is_all_const(columns));
}

StatusOr<ColumnPtr> TimeFunctions::from_unix_to_datetime_with_format(FunctionContext* context,
const starrocks::Columns& columns) {
template <LogicalType TIMESTAMP_TYPE>
StatusOr<ColumnPtr> TimeFunctions::_t_from_unix_with_format(FunctionContext* context,
const starrocks::Columns& columns) {
DCHECK_EQ(columns.size(), 2);
int func_version = context->state()->func_version();
auto* state = reinterpret_cast<FromUnixState*>(context->get_function_state(FunctionContext::FRAGMENT_LOCAL));

if (state->const_format) {
std::string format_content = state->format_content;
if (func_version >= TFunctionVersion::type::FUNC_VERSION_UNIX_TIMESTAMP_INT64) {
return _t_from_unix_with_format_const<TYPE_BIGINT>(format_content, context, columns);
} else {
return _t_from_unix_with_format_const<TYPE_INT>(format_content, context, columns);
}
return _t_from_unix_with_format_const<TIMESTAMP_TYPE>(format_content, context, columns);
}
return _t_from_unix_with_format_general<TIMESTAMP_TYPE>(context, columns);
}

if (func_version >= TFunctionVersion::type::FUNC_VERSION_UNIX_TIMESTAMP_INT64) {
return _t_from_unix_with_format_general<TYPE_BIGINT>(context, columns);
} else {
return _t_from_unix_with_format_general<TYPE_INT>(context, columns);
}
StatusOr<ColumnPtr> TimeFunctions::from_unix_to_datetime_with_format_64(FunctionContext* context,
const starrocks::Columns& columns) {
return _t_from_unix_with_format<TYPE_BIGINT>(context, columns);
}
StatusOr<ColumnPtr> TimeFunctions::from_unix_to_datetime_with_format_32(FunctionContext* context,
const starrocks::Columns& columns) {
return _t_from_unix_with_format<TYPE_INT>(context, columns);
}

/*
Expand Down
18 changes: 12 additions & 6 deletions be/src/exprs/time_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,35 +534,39 @@ class TimeFunctions {
* @paramType columns: [BinaryColumn, BinaryColumn]
* @return BigIntColumn
*/
DEFINE_VECTORIZED_FN(to_unix_from_datetime_with_format);
DEFINE_VECTORIZED_FN(to_unix_from_datetime_with_format_64);
DEFINE_VECTORIZED_FN(to_unix_from_datetime_with_format_32);

/**
* @param: [timestamp]
* @paramType columns: [TimestampColumn]
* @return BigIntColumn
*/
DEFINE_VECTORIZED_FN(to_unix_from_datetime);
DEFINE_VECTORIZED_FN(to_unix_from_datetime_64);
DEFINE_VECTORIZED_FN(to_unix_from_datetime_32);

/**
* @param: [date]
* @paramType columns: [DateColumn]
* @return BigIntColumn
*/
DEFINE_VECTORIZED_FN(to_unix_from_date);
DEFINE_VECTORIZED_FN(to_unix_from_date_64);
DEFINE_VECTORIZED_FN(to_unix_from_date_32);

/**
* @param: []
* @return ConstColumn
*/
DEFINE_VECTORIZED_FN(to_unix_for_now);
DEFINE_VECTORIZED_FN(to_unix_for_now_64);
DEFINE_VECTORIZED_FN(to_unix_for_now_32);

/**
* @param: [timestmap]
* @paramType columns: [IntColumn]
* @return BinaryColumn
*/
DEFINE_VECTORIZED_FN(from_unix_to_datetime);
DEFINE_VECTORIZED_FN(from_unix_to_datetime_64);
DEFINE_VECTORIZED_FN(from_unix_to_datetime_32);

// from_unix_datetime with format's auxiliary method
static Status from_unix_prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope);
Expand All @@ -574,7 +578,8 @@ class TimeFunctions {
* @paramType columns: [IntColumn, BinaryColumn]
* @return BinaryColumn
*/
DEFINE_VECTORIZED_FN(from_unix_to_datetime_with_format);
DEFINE_VECTORIZED_FN(from_unix_to_datetime_with_format_64);
DEFINE_VECTORIZED_FN(from_unix_to_datetime_with_format_32);

/**
* return number of seconds in this day.
Expand Down Expand Up @@ -618,6 +623,7 @@ class TimeFunctions {

static std::string convert_format(const Slice& format);

DEFINE_VECTORIZED_FN_TEMPLATE(_t_from_unix_with_format);
DEFINE_VECTORIZED_FN_TEMPLATE(_t_from_unix_with_format_general);

template <LogicalType TIMESTAMP_TYPE>
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) {
_runtime_state = std::make_shared<RuntimeState>(params.params.query_id, params.params.fragment_instance_id,
params.query_options, params.query_globals, _exec_env);
int func_version = params.__isset.func_version ? params.func_version
: TFunctionVersion::type::FUNC_VERSION_UNIX_TIMESTAMP_INT64;
: TFunctionVersion::type::RUNTIME_FILTER_SERIALIZE_VERSION_2;
_runtime_state->set_func_version(func_version);
_runtime_state->init_mem_trackers(_query_id);
_executor.set_runtime_state(_runtime_state.get());
Expand Down
16 changes: 8 additions & 8 deletions be/test/exprs/time_functions_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ TEST_F(TimeFunctionsTest, toUnixForNow) {
{
Columns columns;

ColumnPtr result = TimeFunctions::to_unix_for_now(_utils->get_fn_ctx(), columns).value();
ColumnPtr result = TimeFunctions::to_unix_for_now_32(_utils->get_fn_ctx(), columns).value();

ASSERT_TRUE(result->is_constant());

Expand All @@ -786,7 +786,7 @@ TEST_F(TimeFunctionsTest, toUnixFromDatetime) {

columns.emplace_back(tc1);

ColumnPtr result = TimeFunctions::to_unix_from_datetime(_utils->get_fn_ctx(), columns).value();
ColumnPtr result = TimeFunctions::to_unix_from_datetime_32(_utils->get_fn_ctx(), columns).value();

ASSERT_TRUE(result->is_numeric());

Expand All @@ -806,7 +806,7 @@ TEST_F(TimeFunctionsTest, toUnixFromDate) {

columns.emplace_back(tc1);

ColumnPtr result = TimeFunctions::to_unix_from_date(_utils->get_fn_ctx(), columns).value();
ColumnPtr result = TimeFunctions::to_unix_from_date_32(_utils->get_fn_ctx(), columns).value();

ASSERT_TRUE(result->is_numeric());

Expand All @@ -830,7 +830,7 @@ TEST_F(TimeFunctionsTest, toUnixFromDatetimeWithFormat) {
columns.emplace_back(tc1);
columns.emplace_back(tc2);

ColumnPtr result = TimeFunctions::to_unix_from_datetime_with_format(_utils->get_fn_ctx(), columns).value();
ColumnPtr result = TimeFunctions::to_unix_from_datetime_with_format_32(_utils->get_fn_ctx(), columns).value();

ASSERT_TRUE(result->is_numeric());

Expand All @@ -851,7 +851,7 @@ TEST_F(TimeFunctionsTest, fromUnixToDatetime) {

columns.emplace_back(tc1);

ColumnPtr result = TimeFunctions::from_unix_to_datetime(_utils->get_fn_ctx(), columns).value();
ColumnPtr result = TimeFunctions::from_unix_to_datetime_32(_utils->get_fn_ctx(), columns).value();

//ASSERT_TRUE(result->is_numeric());

Expand Down Expand Up @@ -884,7 +884,7 @@ TEST_F(TimeFunctionsTest, fromUnixToDatetimeWithFormat) {
FunctionContext::FunctionStateScope::FRAGMENT_LOCAL)
.ok());

ColumnPtr result = TimeFunctions::from_unix_to_datetime_with_format(_utils->get_fn_ctx(), columns).value();
ColumnPtr result = TimeFunctions::from_unix_to_datetime_with_format_32(_utils->get_fn_ctx(), columns).value();

//ASSERT_TRUE(result->is_numeric());

Expand Down Expand Up @@ -918,7 +918,7 @@ TEST_F(TimeFunctionsTest, fromUnixToDatetimeWithConstFormat) {
FunctionContext::FunctionStateScope::FRAGMENT_LOCAL)
.ok());

ColumnPtr result = TimeFunctions::from_unix_to_datetime_with_format(_utils->get_fn_ctx(), columns).value();
ColumnPtr result = TimeFunctions::from_unix_to_datetime_with_format_32(_utils->get_fn_ctx(), columns).value();

//ASSERT_TRUE(result->is_numeric());

Expand All @@ -943,7 +943,7 @@ TEST_F(TimeFunctionsTest, fromUnixToDatetimeWithConstFormat) {
ASSERT_TRUE(TimeFunctions::from_unix_prepare(_utils->get_fn_ctx(),
FunctionContext::FunctionStateScope::FRAGMENT_LOCAL)
.ok());
ColumnPtr result = TimeFunctions::from_unix_to_datetime_with_format(_utils->get_fn_ctx(), columns).value();
ColumnPtr result = TimeFunctions::from_unix_to_datetime_with_format_32(_utils->get_fn_ctx(), columns).value();
ASSERT_TRUE(result->only_null());
ASSERT_TRUE(TimeFunctions::from_unix_close(_utils->get_fn_ctx(),
FunctionContext::FunctionContext::FunctionStateScope::FRAGMENT_LOCAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1504,7 +1504,7 @@ private void toThriftForCommonParams(TExecPlanFragmentParams commonParams,
commonParams.setProtocol_version(InternalServiceVersion.V1);
commonParams.setFragment(fragment.toThrift());
commonParams.setDesc_tbl(descTable);
commonParams.setFunc_version(TFunctionVersion.FUNC_VERSION_UNIX_TIMESTAMP_INT64.getValue());
commonParams.setFunc_version(TFunctionVersion.RUNTIME_FILTER_SERIALIZE_VERSION_2.getValue());
commonParams.setCoord(coordAddress);

commonParams.setParams(new TPlanFragmentExecParams());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,23 +321,41 @@ public static ConstantOperator unixTimestamp(ConstantOperator arg) {
return ConstantOperator.createBigint(value);
}

@ConstantFunction(name = "from_unixtime", argTypes = {BIGINT}, returnType = VARCHAR)
@ConstantFunction.List(list = {
@ConstantFunction(name = "from_unixtime", argTypes = {INT}, returnType = VARCHAR),
@ConstantFunction(name = "from_unixtime", argTypes = {BIGINT}, returnType = VARCHAR)
})
public static ConstantOperator fromUnixTime(ConstantOperator unixTime) throws AnalysisException {
long value = unixTime.getBigint();
long value = 0;
if (unixTime.getType().isInt()) {
value = unixTime.getInt();
} else {
value = unixTime.getBigint();
}
if (value < 0 || value > TimeUtils.MAX_UNIX_TIMESTAMP) {
throw new AnalysisException("unixtime should larger than zero and less than " + TimeUtils.MAX_UNIX_TIMESTAMP);
throw new AnalysisException(
"unixtime should larger than zero and less than " + TimeUtils.MAX_UNIX_TIMESTAMP);
}
ConstantOperator dl = ConstantOperator.createDatetime(
LocalDateTime.ofInstant(Instant.ofEpochSecond(value), TimeUtils.getTimeZone().toZoneId()));
return ConstantOperator.createVarchar(dl.toString());
}

@ConstantFunction(name = "from_unixtime", argTypes = {BIGINT, VARCHAR}, returnType = VARCHAR)
@ConstantFunction.List(list = {
@ConstantFunction(name = "from_unixtime", argTypes = {INT, VARCHAR}, returnType = VARCHAR),
@ConstantFunction(name = "from_unixtime", argTypes = {BIGINT, VARCHAR}, returnType = VARCHAR)
})
public static ConstantOperator fromUnixTime(ConstantOperator unixTime, ConstantOperator fmtLiteral)
throws AnalysisException {
long value = unixTime.getBigint();
long value = 0;
if (unixTime.getType().isInt()) {
value = unixTime.getInt();
} else {
value = unixTime.getBigint();
}
if (value < 0 || value > TimeUtils.MAX_UNIX_TIMESTAMP) {
throw new AnalysisException("unixtime should larger than zero and less than " + TimeUtils.MAX_UNIX_TIMESTAMP);
throw new AnalysisException(
"unixtime should larger than zero and less than " + TimeUtils.MAX_UNIX_TIMESTAMP);
}
ConstantOperator dl = ConstantOperator.createDatetime(
LocalDateTime.ofInstant(Instant.ofEpochSecond(value), TimeUtils.getTimeZone().toZoneId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ public void testValuesNodePredicate() throws Exception {
assertPlanContains("select connection_id()", "<slot 2> : CONNECTION_ID()");
}

@Test
public void testFromUnixtime() throws Exception {
assertPlanContains("select from_unixtime(10)", "'1970-01-01 08:00:10'");
assertPlanContains("select from_unixtime(1024)", "'1970-01-01 08:17:04'");
assertPlanContains("select from_unixtime(32678)", "'1970-01-01 17:04:38'");
assertPlanContains("select from_unixtime(102400000)", "'1973-03-31 12:26:40'");
assertPlanContains("select from_unixtime(253402243100)", "'9999-12-31 15:58:20'");
}

@Test
public void testAggWithConstant() throws Exception {
assertPlanContains("select case when c1=1 then 1 end from (select '1' c1 union all select '2') a " +
Expand Down
Loading

0 comments on commit 17ad317

Please sign in to comment.