Skip to content
12 changes: 6 additions & 6 deletions include/common/tcommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,8 @@ enum {
TMQ_MSG_TYPE__POLL_RAW_DATA_RSP,
};

static const char* const tmqMsgTypeStr[] = {
"data", "meta", "ask ep", "meta data", "wal info", "batch meta", "raw data"
};
static const char* const tmqMsgTypeStr[] = {"data", "meta", "ask ep", "meta data",
"wal info", "batch meta", "raw data"};

enum {
STREAM_INPUT__DATA_SUBMIT = 1,
Expand Down Expand Up @@ -441,9 +440,9 @@ typedef struct STUidTagInfo {
#define TABLE_NAME_COLUMN_INDEX 6
#define PRIMARY_KEY_COLUMN_INDEX 7

//steam get result block column
#define DATA_TS_COLUMN_INDEX 0
#define DATA_VERSION_COLUMN_INDEX 1
// steam get result block column
#define DATA_TS_COLUMN_INDEX 0
#define DATA_VERSION_COLUMN_INDEX 1

// stream create table block column
#define UD_TABLE_NAME_COLUMN_INDEX 0
Expand All @@ -458,6 +457,7 @@ int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol, char* likePat

#define TSMA_RES_STB_POSTFIX "_tsma_res_stb_"
#define MD5_OUTPUT_LEN 32
#define SHA1_OUTPUT_LEN 40
#define TSMA_RES_STB_EXTRA_COLUMN_NUM 4 // 3 columns: _wstart, _wend, _wduration, 1 tag: tbname

static inline bool isTsmaResSTb(const char* stbName) {
Expand Down
35 changes: 18 additions & 17 deletions include/libs/function/functionMgt.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_FIND_IN_SET,
FUNCTION_TYPE_LIKE_IN_SET,
FUNCTION_TYPE_REGEXP_IN_SET,

FUNCTION_TYPE_SHA1,

// conversion function
FUNCTION_TYPE_CAST = 2000,
Expand Down Expand Up @@ -178,19 +178,19 @@ typedef enum EFunctionType {
FUNCTION_TYPE_IROWTS_ORIGIN,
FUNCTION_TYPE_GROUP_ID,
FUNCTION_TYPE_IS_WINDOW_FILLED,
FUNCTION_TYPE_TPREV_TS, // _tprev_ts
FUNCTION_TYPE_TCURRENT_TS, // _tcurrent_ts
FUNCTION_TYPE_TNEXT_TS, // _tnext_ts
FUNCTION_TYPE_TWSTART, // _twstart
FUNCTION_TYPE_TWEND, // _twend
FUNCTION_TYPE_TWDURATION, // _twduration
FUNCTION_TYPE_TWROWNUM, // _twrownum
FUNCTION_TYPE_TPREV_LOCALTIME, // _tprev_localtime
FUNCTION_TYPE_TNEXT_LOCALTIME, // _tnext_localtime
FUNCTION_TYPE_TLOCALTIME, // _tlocaltime
FUNCTION_TYPE_TGRPID, // _tgrpid
FUNCTION_TYPE_PLACEHOLDER_COLUMN, // %%n
FUNCTION_TYPE_PLACEHOLDER_TBNAME, // %%tbname
FUNCTION_TYPE_TPREV_TS, // _tprev_ts
FUNCTION_TYPE_TCURRENT_TS, // _tcurrent_ts
FUNCTION_TYPE_TNEXT_TS, // _tnext_ts
FUNCTION_TYPE_TWSTART, // _twstart
FUNCTION_TYPE_TWEND, // _twend
FUNCTION_TYPE_TWDURATION, // _twduration
FUNCTION_TYPE_TWROWNUM, // _twrownum
FUNCTION_TYPE_TPREV_LOCALTIME, // _tprev_localtime
FUNCTION_TYPE_TNEXT_LOCALTIME, // _tnext_localtime
FUNCTION_TYPE_TLOCALTIME, // _tlocaltime
FUNCTION_TYPE_TGRPID, // _tgrpid
FUNCTION_TYPE_PLACEHOLDER_COLUMN, // %%n
FUNCTION_TYPE_PLACEHOLDER_TBNAME, // %%tbname
FUNCTION_TYPE_IMPUTATION_ROWTS,
FUNCTION_TYPE_IMPUTATION_MARK,
FUNCTION_TYPE_ANOMALY_MARK,
Expand Down Expand Up @@ -399,9 +399,10 @@ int32_t fmGetStreamPesudoFuncEnv(int32_t funcId, SNodeList* pParamNodes, SFuncEx

const void* fmGetStreamPesudoFuncVal(int32_t funcId, const SStreamRuntimeFuncInfo* pStreamRuntimeFuncInfo);

void fmGetStreamPesudoFuncValTbname(int32_t funcId, const SStreamRuntimeFuncInfo* pStreamRuntimeFuncInfo, void** data, int32_t* dataLen);
int32_t fmSetStreamPseudoFuncParamVal(int32_t funcId, SNodeList* pParamNodes, const SStreamRuntimeFuncInfo* pStreamRuntimeInfo);

void fmGetStreamPesudoFuncValTbname(int32_t funcId, const SStreamRuntimeFuncInfo* pStreamRuntimeFuncInfo, void** data,
int32_t* dataLen);
int32_t fmSetStreamPseudoFuncParamVal(int32_t funcId, SNodeList* pParamNodes,
const SStreamRuntimeFuncInfo* pStreamRuntimeInfo);

#ifdef __cplusplus
}
Expand Down
37 changes: 22 additions & 15 deletions include/libs/scalar/scalar.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,22 @@ int32_t scalarConvertOpValueNodeTs(SOperatorNode *node);
/*
pDst need to freed in caller
*/
int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst, const void* pExtraParam, void* streamTsRange);
int32_t scalarCalculateInRange(SNode *pNode, SArray *pBlockList, SScalarParam *pDst, int32_t rowStartIdx, int32_t rowEndIdx, const void* pExtraParam, void* streamTsRange);
void sclFreeParam(SScalarParam* param);
int32_t scalarAssignPlaceHolderRes(SColumnInfoData* pResColData, int64_t offset, int64_t rows, int16_t funcId, const void* pExtraParams);
int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst, const void *pExtraParam,
void *streamTsRange);
int32_t scalarCalculateInRange(SNode *pNode, SArray *pBlockList, SScalarParam *pDst, int32_t rowStartIdx,
int32_t rowEndIdx, const void *pExtraParam, void *streamTsRange);
void sclFreeParam(SScalarParam *param);
int32_t scalarAssignPlaceHolderRes(SColumnInfoData *pResColData, int64_t offset, int64_t rows, int16_t funcId,
const void *pExtraParams);
int32_t scalarGetOperatorParamNum(EOperatorType type);
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type, STypeMod typeMod, int8_t processType);

int32_t vectorGetConvertType(int32_t type1, int32_t type2);
int32_t vectorConvertSingleColImpl(const SScalarParam *pIn, SScalarParam *pOut, int32_t *overflow, int32_t startIndex, int32_t numOfRows);
int32_t vectorConvertSingleCol(SScalarParam *input, SScalarParam *output, int32_t type, STypeMod typeMod, int32_t startIndex, int32_t numOfRows);
int32_t vectorGetConvertType(int32_t type1, int32_t type2);
int32_t vectorConvertSingleColImpl(const SScalarParam *pIn, SScalarParam *pOut, int32_t *overflow, int32_t startIndex,
int32_t numOfRows);
int32_t vectorConvertSingleCol(SScalarParam *input, SScalarParam *output, int32_t type, STypeMod typeMod,
int32_t startIndex, int32_t numOfRows);
STypeMod getConvertTypeMod(int32_t type, const SColumnInfo *pCol1, const SColumnInfo *pCol2);
uint32_t base64BufSize(size_t inputLenBytes);

/* Math functions */
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
Expand Down Expand Up @@ -90,19 +94,21 @@ int32_t upperFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOut
int32_t ltrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t rtrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t md5Function(SScalarParam* pInput, int32_t inputNum, SScalarParam* pOutput);
int32_t md5Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t shaFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t charFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t asciiFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t positionFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t trimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t replaceFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t repeatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t substrIdxFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t base64Function(SScalarParam* pInput, int32_t inputNum, SScalarParam* pOutput);
int32_t crc32Function(SScalarParam* pInput, int32_t inputNum, SScalarParam* pOutput);
int32_t findInSetFunction(SScalarParam* pInput, int32_t inputNum, SScalarParam* pOutput);
int32_t likeInSetFunction(SScalarParam* pInput, int32_t inputNum, SScalarParam* pOutput);
int32_t regexpInSetFunction(SScalarParam* pInput, int32_t inputNum, SScalarParam* pOutput);
int32_t base64Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t base64FunctionFrom(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t crc32Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t findInSetFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t likeInSetFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t regexpInSetFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);

/* Conversion functions */
int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
Expand Down Expand Up @@ -170,7 +176,8 @@ int32_t modeScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
// stream pseudo functions
int32_t streamPseudoScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);

int32_t streamCalcCurrWinTimeRange(STimeRangeNode* node, void* pStRtFuncInfo, STimeWindow* pWinRange, bool* winRangeValid, int32_t type);
int32_t streamCalcCurrWinTimeRange(STimeRangeNode *node, void *pStRtFuncInfo, STimeWindow *pWinRange,
bool *winRangeValid, int32_t type);
int32_t scalarCalculateExtWinsTimeRange(STimeRangeNode *pNode, const void *pExtraParam, SExtWinTimeWindow *pWins);

#ifdef __cplusplus
Expand Down
8 changes: 7 additions & 1 deletion include/util/tbase64.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#define _TD_UTIL_BASE64_H_

#include "os.h"
#include "types.h"

#ifdef __cplusplus
extern "C" {
Expand All @@ -25,8 +26,13 @@ extern "C" {
int32_t base64_decode(const char *value, int32_t inlen, int32_t *outlen, uint8_t **result);
int32_t base64_encode(const uint8_t *value, int32_t vlen, char **result);

void tbase64_encode(uint8_t *out, const uint8_t *input, size_t in_len, VarDataLenT out_len);
int32_t tbase64_decode(uint8_t *out, const uint8_t *input, size_t in_len, VarDataLenT out_len);
uint32_t tbase64_encode_len(size_t in_len);
uint32_t tbase64_decode_len(size_t in_len);

#ifdef __cplusplus
}
#endif

#endif /*_TD_UTIL_BASE64_H_*/
#endif /*_TD_UTIL_BASE64_H_*/
26 changes: 26 additions & 0 deletions include/util/tsha.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#ifndef _TD_UTIL_SHA_H
#define _TD_UTIL_SHA_H

#include "os.h"

#ifdef __cplusplus
extern "C" {
#endif

typedef struct {
uint32_t state[5];
uint32_t count[2];
unsigned char buffer[64];
} T_SHA1_CTX;

void tSHA1Transform(uint32_t state[5], const unsigned char buffer[64]);
void tSHA1Init(T_SHA1_CTX *context);
void tSHA1Update(T_SHA1_CTX *context, const unsigned char *data, uint32_t len);
void tSHA1Final(unsigned char digest[20], T_SHA1_CTX *context);
void tSHA1(char *hash_out, const char *str, uint32_t len);

#ifdef __cplusplus
}
#endif

#endif /*_TD_UTIL_SHA_H*/
18 changes: 15 additions & 3 deletions include/util/tutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "tdef.h"
#include "thash.h"
#include "tmd5.h"
#include "tsha.h"

#ifdef __cplusplus
extern "C" {
Expand Down Expand Up @@ -215,17 +216,28 @@ static FORCE_INLINE int32_t taosHashBinary(char *pBuf, int32_t len) {

static FORCE_INLINE int32_t taosCreateMD5Hash(char *pBuf, int32_t len) {
T_MD5_CTX ctx;

tMD5Init(&ctx);
tMD5Update(&ctx, (uint8_t *)pBuf, len);
tMD5Final(&ctx);
char *p = pBuf;
int32_t resLen = 0;

return sprintf(pBuf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", ctx.digest[0], ctx.digest[1],
ctx.digest[2], ctx.digest[3], ctx.digest[4], ctx.digest[5], ctx.digest[6], ctx.digest[7],
ctx.digest[8], ctx.digest[9], ctx.digest[10], ctx.digest[11], ctx.digest[12], ctx.digest[13],
ctx.digest[14], ctx.digest[15]);
}

static FORCE_INLINE int32_t taosCreateSHA1Hash(char *pBuf, int32_t len) {
uint8_t result[21] = {0};

tSHA1((char *)result, pBuf, len);

return sprintf(pBuf, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", result[0],
result[1], result[2], result[3], result[4], result[5], result[6], result[7], result[8], result[9],
result[10], result[11], result[12], result[13], result[14], result[15], result[16], result[17],
result[18], result[19]);
}

static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, int32_t method, int32_t prefix,
int32_t suffix) {
if ((prefix == 0 && suffix == 0) || (tblen <= (prefix + suffix)) || (tblen <= -1 * (prefix + suffix)) ||
Expand Down Expand Up @@ -301,7 +313,7 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,

#define VND_CHECK_CODE(CODE, LINO, LABEL) TSDB_CHECK_CODE(CODE, LINO, LABEL)

#define TCONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr) - offsetof(type, member)))
#define TCONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr)-offsetof(type, member)))

#define TAOS_GET_TERRNO(code) (terrno == 0 ? code : terrno)

Expand Down
79 changes: 78 additions & 1 deletion source/libs/function/src/builtins.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "scalar.h"
#include "tanalytics.h"
#include "taoserror.h"
#include "tbase64.h"
#include "tglobal.h"
#include "ttypes.h"

Expand Down Expand Up @@ -1024,7 +1025,17 @@ static int32_t translateBase64(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));

SDataType* pRestType1 = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0));
int32_t outputLength = base64BufSize(pRestType1->bytes);
int32_t outputLength = tbase64_encode_len(pRestType1->bytes);

pFunc->node.resType = (SDataType){.bytes = outputLength, .type = TSDB_DATA_TYPE_VARCHAR};
return TSDB_CODE_SUCCESS;
}

static int32_t translateBase64From(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));

SDataType* pRestType1 = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0));
int32_t outputLength = tbase64_decode_len(pRestType1->bytes);

pFunc->node.resType = (SDataType){.bytes = outputLength, .type = TSDB_DATA_TYPE_VARCHAR};
return TSDB_CODE_SUCCESS;
Expand Down Expand Up @@ -1888,6 +1899,9 @@ static int32_t translateOutVarchar(SFunctionNode* pFunc, char* pErrBuf, int32_t
case FUNCTION_TYPE_MD5:
bytes = MD5_OUTPUT_LEN + VARSTR_HEADER_SIZE;
break;
case FUNCTION_TYPE_SHA1:
bytes = SHA1_OUTPUT_LEN + VARSTR_HEADER_SIZE;
break;
case FUNCTION_TYPE_USER:
case FUNCTION_TYPE_CURRENT_USER:
bytes = TSDB_USER_LEN;
Expand Down Expand Up @@ -6750,6 +6764,69 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.sprocessFunc = NULL,
.finalizeFunc = NULL
},
{
.name = "from_base64",
.type = FUNCTION_TYPE_BASE64,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
.parameters = {.minParamNum = 1,
.maxParamNum = 1,
.paramInfoPattern = 1,
.inputParaInfo[0][0] = {.isLastParam = true,
.startParam = 1,
.endParam = 1,
.validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE | FUNC_PARAM_SUPPORT_NCHAR_TYPE | FUNC_PARAM_SUPPORT_NULL_TYPE,
.validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE,
.paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE,
.valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,},
.outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE}},
.translateFunc = translateBase64From,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = base64FunctionFrom,
.finalizeFunc = NULL
},
{
.name = "sha",
.type = FUNCTION_TYPE_SHA1,
.classification = FUNC_MGT_SCALAR_FUNC,
.parameters = {.minParamNum = 1,
.maxParamNum = 1,
.paramInfoPattern = 1,
.inputParaInfo[0][0] = {.isLastParam = true,
.startParam = 1,
.endParam = 1,
.validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE,
.validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE,
.paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE,
.valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,},
.outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE}},
.translateFunc = translateOutVarchar,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = shaFunction,
.finalizeFunc = NULL
},
{
.name = "sha1",
.type = FUNCTION_TYPE_SHA1,
.classification = FUNC_MGT_SCALAR_FUNC,
.parameters = {.minParamNum = 1,
.maxParamNum = 1,
.paramInfoPattern = 1,
.inputParaInfo[0][0] = {.isLastParam = true,
.startParam = 1,
.endParam = 1,
.validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE,
.validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE,
.paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE,
.valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,},
.outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE}},
.translateFunc = translateOutVarchar,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = shaFunction,
.finalizeFunc = NULL
},
};
// clang-format on

Expand Down
Loading
Loading