Skip to content

Commit ba7372f

Browse files
committed
feat(stream): support sub event in event window trigger
1 parent c97abb6 commit ba7372f

File tree

8 files changed

+307
-40
lines changed

8 files changed

+307
-40
lines changed

‎include/util/tobjpool.h‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ void taosObjListPopTail(SObjList *pList);
7575
void taosObjListPopTailEx(SObjList *pList, FDelete fp);
7676
void taosObjListPopObj(SObjList *pList, void *pObj);
7777
void taosObjListPopObjEx(SObjList *pList, void *pObj, FDelete fp);
78+
void taosObjListMoveBefore(SObjList *pList, void *pObj, void *pRefer);
7879

7980
void *taosObjListGetHead(SObjList *pList);
8081
void *taosObjListGetTail(SObjList *pList);

‎source/libs/new-stream/inc/streamInt.h‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ void stmDestroySStreamInfo(void* param);
119119
int32_t streamBuildStateNotifyContent(ESTriggerEventType eventType, SColumnInfo* colInfo, const char* pFromState,
120120
const char* pToState, char** ppContent);
121121
int32_t streamBuildEventNotifyContent(const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t rowIdx,
122-
char** ppContent);
122+
int32_t condIdx, int32_t winIdx, char** ppContent);
123123
int32_t streamBuildBlockResultNotifyContent(const SStreamRunnerTask* pTask, const SSDataBlock* pBlock, char** ppContent,
124124
const SArray* pFields, const int32_t startRow, const int32_t endRow);
125125
int32_t streamSendNotifyContent(SStreamTask* pTask, const char* streamName, const char* tableName, int32_t triggerType,

‎source/libs/new-stream/inc/streamTriggerTask.h‎

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,22 @@ typedef struct SSTriggerRealtimeGroup {
7676
int64_t oldThreshold;
7777
int64_t newThreshold;
7878

79-
SValue stateVal; // for state window trigger
80-
int64_t pendingNullStart; // for state window trigger
81-
int32_t numPendingNull; // for state window trigger
82-
STimeWindow prevWindow; // the last closed window, for sliding trigger
83-
SObjList windows; // SObjList<SSTriggerWindow>, windows not yet closed
84-
SObjList pPendingCalcParams; // SObjList<SSTriggerCalcParam>
85-
SSHashObj *pDoneVersions; // SSHashObj<vgId, SObjList<{skey, ver}>>
79+
union {
80+
STimeWindow prevWindow; // the last closed window, for sliding trigger
81+
struct {
82+
SValue stateVal; // for state window trigger
83+
int64_t pendingNullStart; // for state window trigger
84+
int32_t numPendingNull; // for state window trigger
85+
};
86+
struct {
87+
SSTriggerNotifyWindow parentWindow; // for event window trigger with sub-event
88+
int32_t numSubWindows; // for event window trigger with sub-event
89+
int32_t conditionIdx; // for event window trigger with sub-event
90+
};
91+
};
92+
SObjList windows; // SObjList<SSTriggerWindow>, windows not yet closed
93+
SObjList pPendingCalcParams; // SObjList<SSTriggerCalcParam>
94+
SSHashObj *pDoneVersions; // SSHashObj<vgId, SObjList<{skey, ver}>>
8695

8796
int64_t nextExecTime; // used for max delay and batch window mode
8897
HeapNode heapNode; // used for max delay and batch window mode

‎source/libs/new-stream/src/streamTriggerTask.c‎

Lines changed: 233 additions & 28 deletions
Large diffs are not rendered by default.

‎source/libs/new-stream/src/streamUtil.c‎

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ int32_t streamBuildStateNotifyContent(ESTriggerEventType eventType, SColumnInfo*
464464
}
465465

466466
int32_t streamBuildEventNotifyContent(const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t rowIdx,
467-
char** ppContent) {
467+
int32_t condIdx, int32_t winIdx, char** ppContent) {
468468
int32_t code = TSDB_CODE_SUCCESS;
469469
int32_t lino = 0;
470470
const SNode* pNode = NULL;
@@ -486,14 +486,16 @@ int32_t streamBuildEventNotifyContent(const SSDataBlock* pInputBlock, const SNod
486486

487487
cond = cJSON_CreateObject();
488488
QUERY_CHECK_NULL(cond, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
489-
// todo(kjq): support condition index
490-
JSON_CHECK_ADD_ITEM(cond, "conditionIndex", cJSON_CreateNumber(0));
489+
JSON_CHECK_ADD_ITEM(cond, "conditionIndex", cJSON_CreateNumber(condIdx));
491490
JSON_CHECK_ADD_ITEM(cond, "fieldValues", fields);
492491
fields = NULL;
493492

494493
obj = cJSON_CreateObject();
495494
QUERY_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
496495
JSON_CHECK_ADD_ITEM(obj, "triggerCondition", cond);
496+
if (winIdx >= -1) {
497+
JSON_CHECK_ADD_ITEM(obj, "windowIndex", cJSON_CreateNumber(winIdx));
498+
}
497499
cond = NULL;
498500

499501
*ppContent = cJSON_PrintUnformatted(obj);

‎source/libs/parser/inc/sql.y‎

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,6 +904,14 @@ trigger_type(A) ::= interval_opt(B) SLIDING NK_LP sliding_expr(C) NK_RP.
904904
trigger_type(A) ::= EVENT_WINDOW NK_LP START WITH search_condition(B) END WITH search_condition(C) NK_RP true_for_opt(D). { A = createEventWindowNode(pCxt, B, C, D); }
905905
trigger_type(A) ::= COUNT_WINDOW NK_LP count_window_args(B) NK_RP. { A = createCountWindowNodeFromArgs(pCxt, B); }
906906
trigger_type(A) ::= PERIOD NK_LP interval_sliding_duration_literal(B) offset_opt(C) NK_RP. { A = createPeriodWindowNode(pCxt, releaseRawExprNode(pCxt, B), C); }
907+
trigger_type(A) ::= EVENT_WINDOW NK_LP START WITH NK_LP search_condition_list(B) NK_RP
908+
END WITH search_condition(C) NK_RP true_for_opt(D). { A = createEventWindowNode(pCxt, createNodeListNode(pCxt, B), C, D); }
909+
trigger_type(A) ::= EVENT_WINDOW NK_LP START WITH NK_LP search_condition_list(B) NK_RP NK_RP true_for_opt(D). { A = createEventWindowNode(pCxt, createNodeListNode(pCxt, B), NULL, D); }
910+
911+
%type search_condition_list { SNodeList* }
912+
%destructor search_condition_list { nodesDestroyList($$); }
913+
search_condition_list(A) ::= search_condition(B) NK_COMMA search_condition(C). { A = addNodeToList(pCxt, createNodeList(pCxt, B), C); }
914+
search_condition_list(A) ::= search_condition_list(B) NK_COMMA search_condition(C). { A = addNodeToList(pCxt, B, C); }
907915

908916
interval_opt(A) ::= . { A = NULL; }
909917
interval_opt(A) ::= INTERVAL NK_LP interval_sliding_duration_literal(C) NK_RP. { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, C), NULL, NULL, NULL); }

‎source/libs/parser/src/parTranslater.c‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14209,7 +14209,9 @@ static int32_t createStreamReqBuildTriggerEventWindow(STranslateContext* pCxt, S
1420914209
pReq->triggerType = WINDOW_TYPE_EVENT;
1421014210
PAR_ERR_RET(checkEventWindow(pCxt, pTriggerWindow));
1421114211
PAR_ERR_RET(nodesNodeToString(pTriggerWindow->pStartCond, false, (char**)&pReq->trigger.event.startCond, NULL));
14212-
PAR_ERR_RET(nodesNodeToString(pTriggerWindow->pEndCond, false, (char**)&pReq->trigger.event.endCond, NULL));
14212+
if (pTriggerWindow->pEndCond != NULL) {
14213+
PAR_ERR_RET(nodesNodeToString(pTriggerWindow->pEndCond, false, (char**)&pReq->trigger.event.endCond, NULL));
14214+
}
1421314215
pReq->trigger.event.trueForDuration = createStreamReqWindowGetBigInt(pTriggerWindow->pTrueForLimit);
1421414216
return TSDB_CODE_SUCCESS;
1421514217
}

‎source/util/src/tobjpool.c‎

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,46 @@ void taosObjListPopObjEx(SObjList *pList, void *pObj, FDelete fp) {
402402
taosObjListPopObj(pList, pObj);
403403
}
404404

405+
void taosObjListMoveBefore(SObjList *pList, void *pObj, void *pRefer) {
406+
if (pList == NULL || pObj == NULL || pRefer == NULL || pObj == pRefer) {
407+
return;
408+
}
409+
410+
SObjPool *pPool = pList->pPool;
411+
SObjPoolNode *pObjNode = TOBJPOOL_OBJ_GET_NODE(pObj);
412+
int64_t objIdx = TOBJPOOL_GET_IDX(pPool, pObjNode);
413+
if (objIdx < 0 || objIdx >= pPool->capacity) {
414+
return;
415+
}
416+
SObjPoolNode *pRefNode = TOBJPOOL_OBJ_GET_NODE(pRefer);
417+
int64_t refIdx = TOBJPOOL_GET_IDX(pPool, pRefNode);
418+
if (refIdx < 0 || refIdx >= pPool->capacity) {
419+
return;
420+
}
421+
422+
// remove pObjNode from current position
423+
if (pObjNode->prevIdx != TOBJPOOL_INVALID_IDX) {
424+
TOBJPOOL_GET_NODE(pPool, pObjNode->prevIdx)->nextIdx = pObjNode->nextIdx;
425+
} else {
426+
pList->headIdx = pObjNode->nextIdx;
427+
}
428+
if (pObjNode->nextIdx != TOBJPOOL_INVALID_IDX) {
429+
TOBJPOOL_GET_NODE(pPool, pObjNode->nextIdx)->prevIdx = pObjNode->prevIdx;
430+
} else {
431+
pList->tailIdx = pObjNode->prevIdx;
432+
}
433+
434+
// insert pObjNode before pRefNode
435+
if (pRefNode->prevIdx != TOBJPOOL_INVALID_IDX) {
436+
TOBJPOOL_GET_NODE(pPool, pRefNode->prevIdx)->nextIdx = objIdx;
437+
} else {
438+
pList->headIdx = objIdx;
439+
}
440+
pObjNode->prevIdx = pRefNode->prevIdx;
441+
pObjNode->nextIdx = refIdx;
442+
pRefNode->prevIdx = objIdx;
443+
}
444+
405445
void *taosObjListGetHead(SObjList *pList) {
406446
if (pList == NULL || pList->headIdx == TOBJPOOL_INVALID_IDX) {
407447
return NULL;

0 commit comments

Comments
 (0)