Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: greatest func #30023

Open
wants to merge 2 commits into
base: 3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: greatest func
  • Loading branch information
facetosea committed Mar 5, 2025
commit d56001981b4f765109495852bd976d63ac82ec0b
1 change: 1 addition & 0 deletions include/common/tglobal.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ extern bool tsStreamCoverage;
extern int8_t tsS3EpNum;
extern int32_t tsStreamNotifyMessageSize;
extern int32_t tsStreamNotifyFrameSize;
extern bool tsTransToStrWhenMixTypeInLeast;

extern bool tsExperimental;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
Expand Down
3 changes: 3 additions & 0 deletions include/common/ttypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ typedef struct {
#define IS_STR_DATA_TYPE(t) \
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR))

#define IS_COMPARE_STR_DATA_TYPE(t) \
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR))

#define IS_VALID_TINYINT(_t) ((_t) >= INT8_MIN && (_t) <= INT8_MAX)
#define IS_VALID_SMALLINT(_t) ((_t) >= INT16_MIN && (_t) <= INT16_MAX)
#define IS_VALID_INT(_t) ((_t) >= INT32_MIN && (_t) <= INT32_MAX)
Expand Down
2 changes: 2 additions & 0 deletions include/libs/function/functionMgt.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ typedef enum EFunctionType {
FUNCTION_TYPE_DEGREES,
FUNCTION_TYPE_RADIANS,
FUNCTION_TYPE_TRUNCATE,
FUNCTION_TYPE_GREATEST,
FUNCTION_TYPE_LEAST,

// string function
FUNCTION_TYPE_LENGTH = 1500,
Expand Down
2 changes: 2 additions & 0 deletions include/libs/scalar/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode *
SNode **pOtherCond);
int32_t filterIsMultiTableColsCond(SNode *pCond, bool *res);
EConditionType filterClassifyCondition(SNode *pNode);
int32_t filterGetCompFunc(__compar_fn_t *func, int32_t type, int32_t optr);
bool filterDoCompare(__compar_fn_t func, uint8_t optr, void *left, void *right);

#ifdef __cplusplus
}
Expand Down
3 changes: 3 additions & 0 deletions include/libs/scalar/scalar.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type, int8_

int32_t vectorGetConvertType(int32_t type1, int32_t type2);
int32_t vectorConvertSingleColImpl(const SScalarParam *pIn, SScalarParam *pOut, int32_t *overflow, int32_t startIndex, int32_t numOfRows);
int32_t vectorConvertSingleCol(SScalarParam *input, SScalarParam *output, int32_t type, int32_t startIndex, int32_t numOfRows);

/* Math functions */
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
Expand Down Expand Up @@ -71,6 +72,8 @@ int32_t signFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
int32_t degreesFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t radiansFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t randFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t greatestFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t leastFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);

/* String functions */
int32_t lengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
Expand Down
10 changes: 9 additions & 1 deletion source/common/src/tglobal.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ uint32_t tsEncryptionKeyChksum = 0;
int8_t tsEncryptionKeyStat = ENCRYPT_KEY_STAT_UNSET;
int8_t tsGrant = 1;

bool tsTransToStrWhenMixTypeInLeast = true;

// monitor
bool tsEnableMonitor = true;
int32_t tsMonitorInterval = 30;
Expand Down Expand Up @@ -748,6 +750,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {

TAOS_CHECK_RETURN(
cfgAddBool(pCfg, "streamCoverage", tsStreamCoverage, CFG_DYN_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL));

TAOS_CHECK_RETURN(cfgAddBool(pCfg, "transToStrWhenMixTypeInLeast", tsTransToStrWhenMixTypeInLeast, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT,CFG_CATEGORY_LOCAL));

TAOS_RETURN(TSDB_CODE_SUCCESS);
}
Expand Down Expand Up @@ -1481,6 +1485,9 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamCoverage");
tsStreamCoverage = pItem->bval;

TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "transToStrWhenMixTypeInLeast");
tsTransToStrWhenMixTypeInLeast = pItem->bval;

TAOS_RETURN(TSDB_CODE_SUCCESS);
}

Expand Down Expand Up @@ -2780,7 +2787,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
{"numOfRpcSessions", &tsNumOfRpcSessions},
{"bypassFlag", &tsBypassFlag},
{"safetyCheckLevel", &tsSafetyCheckLevel},
{"streamCoverage", &tsStreamCoverage}};
{"streamCoverage", &tsStreamCoverage},
{"transToStrWhenMixTypeInLeast", &tsTransToStrWhenMixTypeInLeast}};

if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false);
Expand Down
66 changes: 66 additions & 0 deletions source/libs/function/src/builtins.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
#include "tanalytics.h"
#include "taoserror.h"
#include "ttime.h"
#include "functionMgt.h"
#include "ttypes.h"
#include "tglobal.h"

static int32_t buildFuncErrMsg(char* pErrBuf, int32_t len, int32_t errCode, const char* pFormat, ...) {
va_list vArgList;
Expand Down Expand Up @@ -1745,6 +1748,49 @@ static int32_t translateHistogramPartial(SFunctionNode* pFunc, char* pErrBuf, in
return TSDB_CODE_SUCCESS;
}

static int32_t translateGreatestleast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (LIST_LENGTH(pFunc->pParameterList) < 2) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}

bool mixTypeToStrings = tsTransToStrWhenMixTypeInLeast;

SDataType res = {.type = 0};
for (int32_t i = 0; i < LIST_LENGTH(pFunc->pParameterList); i++) {
SDataType* para = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, i));

if (IS_NULL_TYPE(para->type)) {
res.type = TSDB_DATA_TYPE_NULL;
break;
} else if (IS_MATHABLE_TYPE(para->type)) {
if(res.type == 0) {
res.type = para->type;
res.bytes = para->bytes;
} else if(IS_MATHABLE_TYPE(res.type) || !mixTypeToStrings) {
int32_t resType = vectorGetConvertType(res.type, para->type);
res.type = resType == 0 ? res.type : resType;
res.bytes = tDataTypes[resType].bytes;
}
} else if (IS_COMPARE_STR_DATA_TYPE(para->type)) {
if(res.type == 0) {
res.type = para->type;
res.bytes = para->bytes;
} else if(IS_COMPARE_STR_DATA_TYPE(res.type)) {
int32_t resType = vectorGetConvertType(res.type, para->type);
res.type = resType == 0 ? res.type : resType;
res.bytes = TMAX(res.bytes, para->bytes);
} else if(mixTypeToStrings) { // res.type is mathable type
res.type = para->type;
res.bytes = para->bytes;
}
} else {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
}
pFunc->node.resType = res;
return TSDB_CODE_SUCCESS;
}

// clang-format off
const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
Expand Down Expand Up @@ -5656,6 +5702,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "cols",
.translateFunc = invalidColsFunction,
},
{
.name = "greatest",
.type = FUNCTION_TYPE_GREATEST,
.classification = FUNC_MGT_SCALAR_FUNC,
.translateFunc = translateGreatestleast,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = greatestFunction,
.finalizeFunc = NULL
},
{
.name = "least",
.type = FUNCTION_TYPE_LEAST,
.classification = FUNC_MGT_SCALAR_FUNC,
.translateFunc = translateGreatestleast,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = leastFunction,
.finalizeFunc = NULL
},
};
// clang-format on

Expand Down
133 changes: 133 additions & 0 deletions source/libs/scalar/src/sclfunc.c
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#include <stdint.h>
#include "cJSON.h"
#include "function.h"
#include "scalar.h"
#include "sclInt.h"
#include "sclvector.h"
#include "tdatablock.h"
#include "tdef.h"
#include "tjson.h"
#include "ttime.h"
#include "filter.h"

typedef float (*_float_fn)(float);
typedef float (*_float_fn_2)(float, float);
Expand Down Expand Up @@ -4403,3 +4406,133 @@ int32_t modeScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
return selectScalarFunction(pInput, inputNum, pOutput);
}

typedef struct SCovertScarlarParam {
SScalarParam covertParam;
SScalarParam *param;
bool converted;
} SCovertScarlarParam;

void freeSCovertScarlarParams(SCovertScarlarParam *pCovertParams, int32_t num) {
if (pCovertParams == NULL) {
return;
}
for (int32_t i = 0; i < num; i++) {
if (pCovertParams[i].converted) {
sclFreeParam(pCovertParams[i].param);
}
}
taosMemoryFree(pCovertParams);
}

static int32_t vectorCompareAndSelect(SCovertScarlarParam *pParams, int32_t numOfRows, int numOfCols,
int32_t *resultColIndex, EOperatorType optr) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t type = GET_PARAM_TYPE(pParams[0].param);

__compar_fn_t fp = NULL;
code = filterGetCompFunc(&fp, type, optr);
if(code != TSDB_CODE_SUCCESS) {
qError("failed to get compare function, func:%s type:%d, optr:%d", __FUNCTION__, type, optr);
return code;
}

for (int32_t i = 0; i < numOfRows; i++) {
int selectIndex = 0;
if (colDataIsNull_s(pParams[selectIndex].param->columnData, i)) {
resultColIndex[i] = -1;
continue;
}
for (int32_t j = 1; j < numOfCols; j++) {
if (colDataIsNull_s(pParams[j].param->columnData, i)) {
resultColIndex[i] = -1;
break;
} else {
int32_t leftRowNo = pParams[selectIndex].param->numOfRows == 1 ? 0 : i;
int32_t rightRowNo = pParams[j].param->numOfRows == 1 ? 0 : i;
char *pLeftData = colDataGetData(pParams[selectIndex].param->columnData, leftRowNo);
char *pRightData = colDataGetData(pParams[j].param->columnData, rightRowNo);
bool pRes = filterDoCompare(fp, optr, pLeftData, pRightData);
if (!pRes) {
selectIndex = j;
}
}
resultColIndex[i] = selectIndex;
}
}

return code;
}

static int32_t greatestLeastImpl(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, EOperatorType order) {
int32_t code = TSDB_CODE_SUCCESS;
SColumnInfoData *pOutputData = pOutput[0].columnData;
int16_t outputType = GET_PARAM_TYPE(&pOutput[0]);
int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]);

SCovertScarlarParam *pCovertParams = NULL;
int32_t *resultColIndex = NULL;

int32_t numOfRows = 0;
bool IsNullType = false;
// If any column is NULL type, the output is NULL type
for (int32_t i = 0; i < inputNum; i++) {
if (numOfRows != 0 && numOfRows != pInput[i].numOfRows && pInput[i].numOfRows != 1 && numOfRows != 1) {
qError("input rows not match, func:%s, rows:%d, %d", __FUNCTION__, numOfRows, pInput[i].numOfRows);
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto _return;
}
numOfRows = TMAX(numOfRows, pInput[i].numOfRows);
IsNullType |= IS_NULL_TYPE(GET_PARAM_TYPE(&pInput[i]));
}

if (IsNullType) {
colDataSetNNULL(pOutputData, 0, numOfRows);
pOutput->numOfRows = numOfRows;
return TSDB_CODE_SUCCESS;
}
pCovertParams = taosMemoryMalloc(inputNum * sizeof(SCovertScarlarParam));
for (int32_t j = 0; j < inputNum; j++) {
SScalarParam *pParam = &pInput[j];
int16_t oldType = GET_PARAM_TYPE(&pInput[j]);
if (oldType != outputType) {
pCovertParams[j].covertParam = (SScalarParam){0};
setTzCharset(&pCovertParams[j].covertParam, pParam->tz, pParam->charsetCxt);
SCL_ERR_JRET(vectorConvertSingleCol(pParam, &pCovertParams[j].covertParam, outputType, 0, pParam->numOfRows));
pCovertParams[j].param = &pCovertParams[j].covertParam;
pCovertParams[j].converted = true;
} else {
pCovertParams[j].param = pParam;
pCovertParams[j].converted = false;
}
}

resultColIndex = taosMemoryCalloc(numOfRows, sizeof(int32_t));
SCL_ERR_JRET(vectorCompareAndSelect(pCovertParams, numOfRows, inputNum, resultColIndex, order));

for (int32_t i = 0; i < numOfRows; i++) {
int32_t index = resultColIndex[i];
if (index == -1) {
colDataSetNULL(pOutputData, i);
continue;
}
int32_t rowNo = pCovertParams[index].param->numOfRows == 1 ? 0 : i;
char *data = colDataGetData(pCovertParams[index].param->columnData, rowNo);
SCL_ERR_JRET(colDataSetVal(pOutputData, i, data, false));
}

pOutput->numOfRows = numOfRows;

_return:
freeSCovertScarlarParams(pCovertParams, inputNum);
taosMemoryFree(resultColIndex);
return code;
}

int32_t greatestFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return greatestLeastImpl(pInput, inputNum, pOutput, OP_TYPE_GREATER_THAN);
}

int32_t leastFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return greatestLeastImpl(pInput, inputNum, pOutput, OP_TYPE_LOWER_THAN);
}

4 changes: 2 additions & 2 deletions source/libs/scalar/src/sclvector.c
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam *pIn, SScalarParam *pOut,
}

int8_t gConvertTypes[TSDB_DATA_TYPE_MAX][TSDB_DATA_TYPE_MAX] = {
/*NULL BOOL TINY SMAL INT BIG FLOA DOUB VARC TIME NCHA UTIN USMA UINT UBIG JSON VARB DECI BLOB MEDB GEOM*/
/* NULL BOOL TINY SMAL INT BIG FLOA DOUB VARC TIME NCHA UTIN USMA UINT UBIG JSON VARB DECI BLOB MEDB GEOM*/
/*NULL*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*BOOL*/ 0, 0, 2, 3, 4, 5, 6, 7, 5, 9, 5, 11, 12, 13, 14, 0, -1, 0, 0, 0, -1,
/*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 5, 9, 5, 3, 4, 5, 7, 0, -1, 0, 0, 0, -1,
Expand All @@ -1021,7 +1021,7 @@ int8_t gConvertTypes[TSDB_DATA_TYPE_MAX][TSDB_DATA_TYPE_MAX] = {
};

int8_t gDisplyTypes[TSDB_DATA_TYPE_MAX][TSDB_DATA_TYPE_MAX] = {
/*NULL BOOL TINY SMAL INT BIGI FLOA DOUB VARC TIM NCHA UTIN USMA UINT UBIG JSON VARB DECI BLOB MEDB GEOM*/
/* NULL BOOL TINY SMAL INT BIGI FLOA DOUB VARC TIM NCHA UTIN USMA UINT UBIG JSON VARB DECI BLOB MEDB GEOM*/
/*NULL*/ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, -1, -1, -1, 20,
/*BOOL*/ 0, 1, 2, 3, 4, 5, 6, 7, 8, 5, 10, 11, 12, 13, 14, -1, -1, -1, -1, -1, -1,
/*TINY*/ 0, 0, 2, 3, 4, 5, 8, 8, 8, 5, 10, 3, 4, 5, 8, -1, -1, -1, -1, -1, -1,
Expand Down
2 changes: 1 addition & 1 deletion tools/shell/src/shellEngine.c
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
printf("%*.7e", width, GET_FLOAT_VAL(val));
} else {
n = snprintf(buf, LENGTH, "%*.*g", width, FLT_DIG, GET_FLOAT_VAL(val));
if (n > SHELL_FLOAT_WIDTH) {
if (n > width) {
printf("%*.7e", width, GET_FLOAT_VAL(val));
} else {
printf("%s", buf);
Expand Down
Loading