Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/util/tobjpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ void taosObjListPopTail(SObjList *pList);
void taosObjListPopTailEx(SObjList *pList, FDelete fp);
void taosObjListPopObj(SObjList *pList, void *pObj);
void taosObjListPopObjEx(SObjList *pList, void *pObj, FDelete fp);
void taosObjListMoveBefore(SObjList *pList, void *pObj, void *pRefer);

void *taosObjListGetHead(SObjList *pList);
void *taosObjListGetTail(SObjList *pList);
Expand Down
2 changes: 1 addition & 1 deletion source/libs/new-stream/inc/streamInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void stmDestroySStreamInfo(void* param);
int32_t streamBuildStateNotifyContent(ESTriggerEventType eventType, SColumnInfo* colInfo, const char* pFromState,
const char* pToState, char** ppContent);
int32_t streamBuildEventNotifyContent(const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t rowIdx,
char** ppContent);
int32_t condIdx, int32_t winIdx, char** ppContent);
int32_t streamBuildBlockResultNotifyContent(const SStreamRunnerTask* pTask, const SSDataBlock* pBlock, char** ppContent,
const SArray* pFields, const int32_t startRow, const int32_t endRow);
int32_t streamSendNotifyContent(SStreamTask* pTask, const char* streamName, const char* tableName, int32_t triggerType,
Expand Down
23 changes: 16 additions & 7 deletions source/libs/new-stream/inc/streamTriggerTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,22 @@ typedef struct SSTriggerRealtimeGroup {
int64_t oldThreshold;
int64_t newThreshold;

SValue stateVal; // for state window trigger
int64_t pendingNullStart; // for state window trigger
int32_t numPendingNull; // for state window trigger
STimeWindow prevWindow; // the last closed window, for sliding trigger
SObjList windows; // SObjList<SSTriggerWindow>, windows not yet closed
SObjList pPendingCalcParams; // SObjList<SSTriggerCalcParam>
SSHashObj *pDoneVersions; // SSHashObj<vgId, SObjList<{skey, ver}>>
union {
STimeWindow prevWindow; // the last closed window, for sliding trigger
struct {
SValue stateVal; // for state window trigger
int64_t pendingNullStart; // for state window trigger
int32_t numPendingNull; // for state window trigger
};
struct {
SSTriggerNotifyWindow parentWindow; // for event window trigger with sub-event
int32_t numSubWindows; // for event window trigger with sub-event
int32_t conditionIdx; // for event window trigger with sub-event
};
};
SObjList windows; // SObjList<SSTriggerWindow>, windows not yet closed
SObjList pPendingCalcParams; // SObjList<SSTriggerCalcParam>
SSHashObj *pDoneVersions; // SSHashObj<vgId, SObjList<{skey, ver}>>

int64_t nextExecTime; // used for max delay and batch window mode
HeapNode heapNode; // used for max delay and batch window mode
Expand Down
261 changes: 233 additions & 28 deletions source/libs/new-stream/src/streamTriggerTask.c

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions source/libs/new-stream/src/streamUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ int32_t streamBuildStateNotifyContent(ESTriggerEventType eventType, SColumnInfo*
}

int32_t streamBuildEventNotifyContent(const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t rowIdx,
char** ppContent) {
int32_t condIdx, int32_t winIdx, char** ppContent) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
const SNode* pNode = NULL;
Expand All @@ -486,14 +486,16 @@ int32_t streamBuildEventNotifyContent(const SSDataBlock* pInputBlock, const SNod

cond = cJSON_CreateObject();
QUERY_CHECK_NULL(cond, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
// todo(kjq): support condition index
JSON_CHECK_ADD_ITEM(cond, "conditionIndex", cJSON_CreateNumber(0));
JSON_CHECK_ADD_ITEM(cond, "conditionIndex", cJSON_CreateNumber(condIdx));
JSON_CHECK_ADD_ITEM(cond, "fieldValues", fields);
fields = NULL;

obj = cJSON_CreateObject();
QUERY_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
JSON_CHECK_ADD_ITEM(obj, "triggerCondition", cond);
if (winIdx >= -1) {
JSON_CHECK_ADD_ITEM(obj, "windowIndex", cJSON_CreateNumber(winIdx));
}
cond = NULL;

*ppContent = cJSON_PrintUnformatted(obj);
Expand Down
8 changes: 8 additions & 0 deletions source/libs/parser/inc/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,14 @@ trigger_type(A) ::= interval_opt(B) SLIDING NK_LP sliding_expr(C) NK_RP.
trigger_type(A) ::= EVENT_WINDOW NK_LP START WITH search_condition(B) END WITH search_condition(C) NK_RP true_for_opt(D). { A = createEventWindowNode(pCxt, B, C, D); }
trigger_type(A) ::= COUNT_WINDOW NK_LP count_window_args(B) NK_RP. { A = createCountWindowNodeFromArgs(pCxt, B); }
trigger_type(A) ::= PERIOD NK_LP interval_sliding_duration_literal(B) offset_opt(C) NK_RP. { A = createPeriodWindowNode(pCxt, releaseRawExprNode(pCxt, B), C); }
trigger_type(A) ::= EVENT_WINDOW NK_LP START WITH NK_LP search_condition_list(B) NK_RP
END WITH search_condition(C) NK_RP true_for_opt(D). { A = createEventWindowNode(pCxt, createNodeListNode(pCxt, B), C, D); }
trigger_type(A) ::= EVENT_WINDOW NK_LP START WITH NK_LP search_condition_list(B) NK_RP NK_RP true_for_opt(D). { A = createEventWindowNode(pCxt, createNodeListNode(pCxt, B), NULL, D); }

%type search_condition_list { SNodeList* }
%destructor search_condition_list { nodesDestroyList($$); }
search_condition_list(A) ::= search_condition(B) NK_COMMA search_condition(C). { A = addNodeToList(pCxt, createNodeList(pCxt, B), C); }
search_condition_list(A) ::= search_condition_list(B) NK_COMMA search_condition(C). { A = addNodeToList(pCxt, B, C); }

interval_opt(A) ::= . { A = NULL; }
interval_opt(A) ::= INTERVAL NK_LP interval_sliding_duration_literal(C) NK_RP. { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, C), NULL, NULL, NULL); }
Expand Down
4 changes: 3 additions & 1 deletion source/libs/parser/src/parTranslater.c
Original file line number Diff line number Diff line change
Expand Up @@ -14209,7 +14209,9 @@ static int32_t createStreamReqBuildTriggerEventWindow(STranslateContext* pCxt, S
pReq->triggerType = WINDOW_TYPE_EVENT;
PAR_ERR_RET(checkEventWindow(pCxt, pTriggerWindow));
PAR_ERR_RET(nodesNodeToString(pTriggerWindow->pStartCond, false, (char**)&pReq->trigger.event.startCond, NULL));
PAR_ERR_RET(nodesNodeToString(pTriggerWindow->pEndCond, false, (char**)&pReq->trigger.event.endCond, NULL));
if (pTriggerWindow->pEndCond != NULL) {
PAR_ERR_RET(nodesNodeToString(pTriggerWindow->pEndCond, false, (char**)&pReq->trigger.event.endCond, NULL));
}
pReq->trigger.event.trueForDuration = createStreamReqWindowGetBigInt(pTriggerWindow->pTrueForLimit);
return TSDB_CODE_SUCCESS;
}
Expand Down
40 changes: 40 additions & 0 deletions source/util/src/tobjpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,46 @@ void taosObjListPopObjEx(SObjList *pList, void *pObj, FDelete fp) {
taosObjListPopObj(pList, pObj);
}

void taosObjListMoveBefore(SObjList *pList, void *pObj, void *pRefer) {
if (pList == NULL || pObj == NULL || pRefer == NULL || pObj == pRefer) {
return;
}

SObjPool *pPool = pList->pPool;
SObjPoolNode *pObjNode = TOBJPOOL_OBJ_GET_NODE(pObj);
int64_t objIdx = TOBJPOOL_GET_IDX(pPool, pObjNode);
if (objIdx < 0 || objIdx >= pPool->capacity) {
return;
}
SObjPoolNode *pRefNode = TOBJPOOL_OBJ_GET_NODE(pRefer);
int64_t refIdx = TOBJPOOL_GET_IDX(pPool, pRefNode);
if (refIdx < 0 || refIdx >= pPool->capacity) {
return;
}

// remove pObjNode from current position
if (pObjNode->prevIdx != TOBJPOOL_INVALID_IDX) {
TOBJPOOL_GET_NODE(pPool, pObjNode->prevIdx)->nextIdx = pObjNode->nextIdx;
} else {
pList->headIdx = pObjNode->nextIdx;
}
if (pObjNode->nextIdx != TOBJPOOL_INVALID_IDX) {
TOBJPOOL_GET_NODE(pPool, pObjNode->nextIdx)->prevIdx = pObjNode->prevIdx;
} else {
pList->tailIdx = pObjNode->prevIdx;
}

// insert pObjNode before pRefNode
if (pRefNode->prevIdx != TOBJPOOL_INVALID_IDX) {
TOBJPOOL_GET_NODE(pPool, pRefNode->prevIdx)->nextIdx = objIdx;
} else {
pList->headIdx = objIdx;
}
pObjNode->prevIdx = pRefNode->prevIdx;
pObjNode->nextIdx = refIdx;
pRefNode->prevIdx = objIdx;
}

void *taosObjListGetHead(SObjList *pList) {
if (pList == NULL || pList->headIdx == TOBJPOOL_INVALID_IDX) {
return NULL;
Expand Down
Loading