Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fix: recored unfinished datablock
  • Loading branch information
Tony Zhang committed Oct 31, 2025
commit ebc20584c339440b4606eb4624aa11a04b9d1de9
13 changes: 12 additions & 1 deletion source/libs/executor/inc/executorInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,17 @@ static inline void resetNumNullRows(SWindowRowsSup* pRowSup) {
pRowSup->numNullRows = 0;
}

static inline void resetWindowRowsSup(SWindowRowsSup* pRowSup) {
if (NULL == pRowSup) {
return;
}

pRowSup->win.skey = pRowSup->win.ekey = 0;
pRowSup->prevTs = pRowSup->startRowIndex = 0;
pRowSup->numOfRows = pRowSup->groupId = 0;
resetNumNullRows(pRowSup);
}

typedef int32_t (*AggImplFn)(struct SOperatorInfo* pOperator, SSDataBlock* pBlock);

typedef struct SSessionAggOperatorInfo {
Expand All @@ -716,7 +727,7 @@ typedef struct SStateWindowOperatorInfo {
SExprSupp scalarSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
SColumn stateCol; // start row index
SColumn stateCol;
bool hasKey;
SStateKeys stateKey;
int32_t tsSlotId; // primary timestamp column slot id
Expand Down
147 changes: 96 additions & 51 deletions source/libs/executor/src/timewindowoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -1009,18 +1009,24 @@ void doKeepStateWindowNullInfo(SWindowRowsSup* pRowSup, int32_t nullRowIndex) {
pRowSup->numNullRows += 1;
}

static int32_t processClosedStateWindow(SStateWindowOperatorInfo* pInfo, SWindowRowsSup* pRowSup, SExecTaskInfo* pTaskInfo,
SExprSupp* pSup, int32_t numOfOutput) {
// process a closed state window
// do aggregation on the tuples within the window
// partial aggregation results are stored in the output buffer
static int32_t processClosedStateWindow(SStateWindowOperatorInfo* pInfo,
SWindowRowsSup* pRowSup, SExecTaskInfo* pTaskInfo,
SExprSupp* pSup, int32_t numOfOutput) {
int32_t code = 0;
int32_t lino = 0;
SResultRow* pResult = NULL;
code = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, true, &pResult, pRowSup->groupId,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
code = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win,
true, &pResult, pRowSup->groupId, pSup->pCtx, numOfOutput,
pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
QUERY_CHECK_CODE(code, lino, _return);

updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
pRowSup->startRowIndex, pRowSup->numOfRows, 0, numOfOutput);
code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx,
&pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, 0, numOfOutput);
QUERY_CHECK_CODE(code, lino, _return);

_return:
Expand All @@ -1030,11 +1036,15 @@ static int32_t processClosedStateWindow(SStateWindowOperatorInfo* pInfo, SWindow
return code;
}

static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
static void doStateWindowAggImpl(SOperatorInfo* pOperator,
SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t* startIndex,
int32_t* endIndex, int32_t* partialCalcIndex) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp;

SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
printDataBlock(pBlock, "tooony", "tooony", 9898);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This printDataBlock call appears to be a leftover debug statement. It should be removed before merging to avoid polluting logs in production environments.

SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock,
pInfo->stateCol.slotId);
if (!pStateColInfoData) {
pTaskInfo->code = terrno;
T_LONG_JMP(pTaskInfo->env, terrno);
Expand All @@ -1050,15 +1060,12 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
}
TSKEY* tsList = (TSKEY*)pColInfoData->pData;

// pRowSup contains info of current state window
// pRowSup contains info of current unclosed state window
SWindowRowsSup* pRowSup = &pInfo->winSup;
pRowSup->numOfRows = 0;
pRowSup->startRowIndex = 0;
resetNumNullRows(pRowSup);

struct SColumnDataAgg* pAgg = NULL;
EStateWinExtendOption extendOption = pInfo->extendOption;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
for (int32_t j = *startIndex; j < *endIndex; ++j) {
pAgg = (pBlock->pBlockAgg != NULL) ? &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
doKeepStateWindowNullInfo(pRowSup, j);
Expand All @@ -1073,17 +1080,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
char* val = colDataGetData(pStateColInfoData, j);

if (gid != pRowSup->groupId || !pInfo->hasKey) {
// todo extract method
if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
if (IS_STR_DATA_BLOB(pInfo->stateKey.type)) {
blobDataCopy(pInfo->stateKey.pData, val);
} else {
varDataCopy(pInfo->stateKey.pData, val);
}
} else {
memcpy(pInfo->stateKey.pData, val, bytes);
}

assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
pInfo->hasKey = true;

doKeepNewStateWindowStartInfo(pRowSup, tsList, j, gid, &extendOption, false);
Expand All @@ -1093,33 +1090,36 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
} else {
// close and process current state window
doKeepCurStateWindowEndInfo(pRowSup, tsList, j, &extendOption, true);
int32_t ret = processClosedStateWindow(pInfo, pRowSup, pTaskInfo, pSup, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo, pSup, numOfOutput);
if (TSDB_CODE_SUCCESS != code) {
T_LONG_JMP(pTaskInfo->env, code);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This is an extra blank line that can be removed to improve code conciseness.

// start a new state window
doKeepNewStateWindowStartInfo(pRowSup, tsList, j, gid, &extendOption, true);
doKeepTuple(pRowSup, tsList[j], j, gid);
*partialCalcIndex = pRowSup->startRowIndex;

// todo extract method
if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
if (IS_STR_DATA_BLOB(pInfo->stateKey.type)) {
blobDataCopy(pInfo->stateKey.pData, val);
} else {
varDataCopy(pInfo->stateKey.pData, val);
}
} else {
memcpy(pInfo->stateKey.pData, val, bytes);
}
assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
}
}

// if window hasn't been closed, process it now
doKeepCurStateWindowEndInfo(pRowSup, tsList, pBlock->info.rows, &extendOption, false);
int32_t ret = processClosedStateWindow(pInfo, pRowSup, pTaskInfo, pSup, numOfOutput);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
bool blockEndsWithNullStateCol = colDataIsNull(pStateColInfoData,
pBlock->info.rows, *endIndex - 1, pAgg);
if (blockEndsWithNullStateCol) {
// if the block ends with null state col,
// we do not process the current window here
// since we don't know the belonging of these null rows
} else {
doKeepCurStateWindowEndInfo(pRowSup, tsList, pBlock->info.rows, &extendOption, false);
int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo, pSup, numOfOutput);
if (TSDB_CODE_SUCCESS != code) {
T_LONG_JMP(pTaskInfo->env, code);
}
*partialCalcIndex = *endIndex;
// reset pRowSup after doing agg calculation
pRowSup->startRowIndex = 0;
pRowSup->numOfRows = 0;
}
}

Expand All @@ -1139,30 +1139,75 @@ static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {

SOperatorInfo* downstream = pOperator->pDownstream[0];
pInfo->cleanGroupResInfo = false;

SSDataBlock* pUnfinishedBlock = NULL;
int32_t startIndex = 0;
int32_t endIndex = 0;
int32_t numPartialCalcRows = 0;
while (1) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
if (pUnfinishedBlock != NULL) {
code = setInputDataBlock(pSup, pUnfinishedBlock, order, pUnfinishedBlock->info.scanFlag, true);
QUERY_CHECK_CODE(code, lino, _end);

// handle the last unclosed window
doKeepCurStateWindowEndInfo(&pInfo->winSup,
(TSKEY*)((SColumnInfoData*)(taosArrayGet(pUnfinishedBlock->pDataBlock, pInfo->tsSlotId)))->pData,
pUnfinishedBlock->info.rows, &pInfo->extendOption, false);
code = processClosedStateWindow(pInfo, &pInfo->winSup, pTaskInfo, pSup,
pOperator->exprSupp.numOfExprs);
QUERY_CHECK_CODE(code, lino, _end);

blockDataDestroy(pUnfinishedBlock);
pUnfinishedBlock = NULL;
resetWindowRowsSup(&pInfo->winSup);
}
break;
}

startIndex = 0;
if (pUnfinishedBlock != NULL) {
startIndex = pUnfinishedBlock->info.rows;
// merge unfinished block with current block
code = blockDataMerge(pUnfinishedBlock, pBlock);
QUERY_CHECK_CODE(code, lino, _end);
} else {
code = createOneDataBlock(pBlock, true, &pUnfinishedBlock);
QUERY_CHECK_CODE(code, lino, _end);
}
endIndex = pUnfinishedBlock->info.rows;

pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
pInfo->binfo.pRes->info.scanFlag = pUnfinishedBlock->info.scanFlag;
code = setInputDataBlock(pSup, pUnfinishedBlock, order, pUnfinishedBlock->info.scanFlag, true);
QUERY_CHECK_CODE(code, lino, _end);

code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
code = blockDataUpdateTsWindow(pUnfinishedBlock, pInfo->tsSlotId);
QUERY_CHECK_CODE(code, lino, _end);

// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pExprInfo != NULL) {
pTaskInfo->code =
projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
pInfo->scalarSup.numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo,
pUnfinishedBlock, pUnfinishedBlock, pInfo->scalarSup.pCtx,
pInfo->scalarSup.numOfExprs, NULL,
GET_STM_RTINFO(pOperator->pTaskInfo));
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
}

doStateWindowAggImpl(pOperator, pInfo, pBlock);
doStateWindowAggImpl(pOperator, pInfo, pUnfinishedBlock, &startIndex, &endIndex,
&numPartialCalcRows);
if (numPartialCalcRows < pUnfinishedBlock->info.rows) {
// save unfinished block for next round processing
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataTrimFirstRows(pUnfinishedBlock, numPartialCalcRows);
QUERY_CHECK_NULL(pUnfinishedBlock, code, lino, _end, terrno);
pInfo->winSup.startRowIndex -= numPartialCalcRows;
} else {
blockDataDestroy(pUnfinishedBlock);
pUnfinishedBlock = NULL;
}
}

pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
Expand All @@ -1188,8 +1233,8 @@ static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe

int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStateWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStateWindowOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;

code = pOperator->fpSet._openFn(pOperator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@ def prepare_data(self):
tdSql.execute("create database if not exists testdb", show=True)
tdSql.execute("use testdb")
values = """
('2025-09-01 10:00:00', null, 20), \
('2025-09-01 10:00:01', 'a', 23.5), \
('2025-09-01 10:00:02', 'a', 25.9), \
('2025-09-01 10:02:15', null, 26), \
('2025-09-01 10:02:45', 'a', 28), \
('2025-09-01 10:04:00', null, 24.3), \
('2025-09-01 10:05:00', null, null), \
('2025-09-01 11:01:10', 'b', 18), \
('2025-09-01 12:03:22', 'b', 14.4), \
('2025-09-01 12:20:19', 'a', 17.7), \
('2025-09-01 13:00:00', 'a', null), \
('2025-09-01 14:00:00', null, 22.3), \
('2025-09-01 18:18:18', 'b', 18.18), \
('2025-09-01 20:00:00', 'b', 19.5), \
('2025-09-01 10:00:00', null, 20),
('2025-09-01 10:00:01', 'a', 23.5),
('2025-09-01 10:00:02', 'a', 25.9),
('2025-09-01 10:02:15', null, 26),
('2025-09-01 10:02:45', 'a', 28),
('2025-09-01 10:04:00', null, 24.3),
('2025-09-01 10:05:00', null, null),
('2025-09-01 11:01:10', 'b', 18),
('2025-09-01 12:03:22', 'b', 14.4),
('2025-09-01 12:20:19', 'a', 17.7),
('2025-09-01 13:00:00', 'a', null),
('2025-09-01 14:00:00', null, 22.3),
('2025-09-01 18:18:18', 'b', 18.18),
('2025-09-01 20:00:00', 'b', 19.5),
('2025-09-02 08:00:00', null, 9.9)
"""
# normal table
Expand Down Expand Up @@ -590,11 +590,11 @@ def check_stream_computing_normal_table(self):

stream = StreamItem (
id=2,
stream="create stream scn2 count_window(5) from ntb into res_scn2 as \
select _wstart, _wduration, _wend, _twstart, _twend, \
count(*) cnt_all, count(s) cnt_s, count(v) cnt_v, avg(v) avg_v \
from ntb where ts >= _twstart and ts <= _twend \
state_window(s, 2)",
stream='''create stream scn2 count_window(5) from ntb into res_scn2 as
select _wstart, _wduration, _wend, _twstart, _twend,
count(*) cnt_all, count(s) cnt_s, count(v) cnt_v, avg(v) avg_v
from ntb where ts >= _twstart and ts <= _twend
state_window(s, 2)''',
res_query="select _wstart, _wduration, _wend, cnt_all, cnt_s, cnt_v, avg_v from res_scn2 \
where _wend <= '2025-09-02 08:00:04.000'",
exp_query="select _wstart, _wduration, _wend, count(*), count(s), count(v), avg(v) from ntb \
Expand Down
Loading