Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,28 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
return r.bc.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
}

<<<<<<< HEAD
func (r *readIndexExecutor) RealtimeSummary() *execute.SubtaskSummary {
return &execute.SubtaskSummary{
RowCount: r.curRowCount.Load(),
=======
func (r *readIndexStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subtask) error {
logutil.DDLLogger().Info("read index executor run subtask",
zap.Bool("use cloud", r.isGlobalSort()))

r.summaryMap.Store(subtask.ID, &readIndexSummary{
metaGroups: make([]*external.SortedKVMeta, len(r.indexes)),
})
var err error
failpoint.InjectCall("beforeReadIndexStepExecRunSubtask", &err)
if err != nil {
return err
}

sm, err := decodeBackfillSubTaskMeta(ctx, r.cloudStorageURI, subtask.Meta)
if err != nil {
return err
>>>>>>> 968e31fc3fe (ddl: cancel the job context before rolling back (#64130))
}
}

Expand Down
153 changes: 136 additions & 17 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2862,24 +2862,12 @@ func (w *worker) executeDistTask(stepCtx context.Context, t table.Table, reorgIn
select {
case <-done:
w.updateDistTaskRowCount(taskKey, reorgInfo.Job.ID)
return nil
err := w.checkRunnableOrHandlePauseOrCanceled(stepCtx, taskKey)
return errors.Trace(err)
case <-checkFinishTk.C:
if err = w.isReorgRunnable(stepCtx, true); err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
if err = handle.PauseTask(w.workCtx, taskKey); err != nil {
logutil.DDLLogger().Error("pause task error", zap.String("task_key", taskKey), zap.Error(err))
continue
}
failpoint.InjectCall("syncDDLTaskPause")
}
if !dbterror.ErrCancelledDDLJob.Equal(err) {
return errors.Trace(err)
}
if err = handle.CancelTask(w.workCtx, taskKey); err != nil {
logutil.DDLLogger().Error("cancel task error", zap.String("task_key", taskKey), zap.Error(err))
// continue to cancel task.
continue
}
err := w.checkRunnableOrHandlePauseOrCanceled(stepCtx, taskKey)
if err != nil {
return errors.Trace(err)
}
case <-updateRowCntTk.C:
w.updateDistTaskRowCount(taskKey, reorgInfo.Job.ID)
Expand All @@ -2890,6 +2878,137 @@ func (w *worker) executeDistTask(stepCtx context.Context, t table.Table, reorgIn
return err
}

<<<<<<< HEAD
=======
func (w *worker) checkRunnableOrHandlePauseOrCanceled(stepCtx context.Context, taskKey string) (err error) {
if err = w.isReorgRunnable(stepCtx, true); err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
if err = handle.PauseTask(w.workCtx, taskKey); err != nil {
logutil.DDLLogger().Warn("pause task error", zap.String("task_key", taskKey), zap.Error(err))
return nil
}
failpoint.InjectCall("syncDDLTaskPause")
}
if !dbterror.ErrCancelledDDLJob.Equal(err) {
return errors.Trace(err)
}
if err = handle.CancelTask(w.workCtx, taskKey); err != nil {
logutil.DDLLogger().Warn("cancel task error", zap.String("task_key", taskKey), zap.Error(err))
return nil
}
}
return nil
}

// Note: we can achieve the same effect by calling ModifyTaskByID directly inside
// the process of 'ADMIN ALTER DDL JOB xxx', so we can eliminate the goroutine,
// but if the task hasn't been created we need to make sure the task is created
// with config after ALTER DDL JOB is executed. A possible solution is to make
// the DXF task submission and 'ADMIN ALTER DDL JOB xxx' txn conflict with each
// other when they overlap in time, by modify the job at the same time when submit
// task, as we are using optimistic txn. But this will cause WRITE CONFLICT with
// outer txn in transitOneJobStep.
func modifyTaskParamLoop(
ctx context.Context,
sysTblMgr systable.Manager,
taskManager storage.Manager,
done chan struct{},
jobID, taskID int64,
lastConcurrency, lastBatchSize, lastMaxWriteSpeed int,
) {
logger := logutil.DDLLogger().With(zap.Int64("jobID", jobID), zap.Int64("taskID", taskID))
ticker := time.NewTicker(UpdateDDLJobReorgCfgInterval)
defer ticker.Stop()
for {
select {
case <-done:
return
case <-ticker.C:
}

latestJob, err := sysTblMgr.GetJobByID(ctx, jobID)
if err != nil {
if goerrors.Is(err, systable.ErrNotFound) {
logger.Info("job not found, might already finished")
return
}
logger.Error("get job failed, will retry later", zap.Error(err))
continue
}

modifies := make([]proto.Modification, 0, 3)
workerCntLimit := latestJob.ReorgMeta.GetConcurrency()
concurrency, err := adjustConcurrency(ctx, taskManager, workerCntLimit)
if err != nil {
logger.Error("adjust concurrency failed", zap.Error(err))
continue
}
if concurrency != lastConcurrency {
modifies = append(modifies, proto.Modification{
Type: proto.ModifyConcurrency,
To: int64(concurrency),
})
}
batchSize := latestJob.ReorgMeta.GetBatchSize()
if batchSize != lastBatchSize {
modifies = append(modifies, proto.Modification{
Type: proto.ModifyBatchSize,
To: int64(batchSize),
})
}
maxWriteSpeed := latestJob.ReorgMeta.GetMaxWriteSpeed()
if maxWriteSpeed != lastMaxWriteSpeed {
modifies = append(modifies, proto.Modification{
Type: proto.ModifyMaxWriteSpeed,
To: int64(maxWriteSpeed),
})
}
if len(modifies) == 0 {
continue
}
currTask, err := taskManager.GetTaskByID(ctx, taskID)
if err != nil {
if goerrors.Is(err, storage.ErrTaskNotFound) {
logger.Info("task not found, might already finished")
return
}
logger.Error("get task failed, will retry later", zap.Error(err))
continue
}
if !currTask.State.CanMoveToModifying() {
// user might modify param again while another modify is ongoing.
logger.Info("task state is not suitable for modifying, will retry later",
zap.String("state", currTask.State.String()))
continue
}
if err = taskManager.ModifyTaskByID(ctx, taskID, &proto.ModifyParam{
PrevState: currTask.State,
Modifications: modifies,
}); err != nil {
logger.Error("modify task failed", zap.Error(err))
continue
}
logger.Info("modify task success",
zap.Int("oldConcurrency", lastConcurrency), zap.Int("newConcurrency", concurrency),
zap.Int("oldBatchSize", lastBatchSize), zap.Int("newBatchSize", batchSize),
zap.String("oldMaxWriteSpeed", units.HumanSize(float64(lastMaxWriteSpeed))),
zap.String("newMaxWriteSpeed", units.HumanSize(float64(maxWriteSpeed))),
)
lastConcurrency = concurrency
lastBatchSize = batchSize
lastMaxWriteSpeed = maxWriteSpeed
}
}

func adjustConcurrency(ctx context.Context, taskMgr storage.Manager, workerCnt int) (int, error) {
cpuCount, err := taskMgr.GetCPUCountOfNode(ctx)
if err != nil {
return 0, err
}
return min(workerCnt, cpuCount), nil
}

>>>>>>> 968e31fc3fe (ddl: cancel the job context before rolling back (#64130))
// EstimateTableRowSizeForTest is used for test.
var EstimateTableRowSizeForTest = estimateTableRowSize

Expand Down
18 changes: 14 additions & 4 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,17 @@ func (c *jobContext) initStepCtx() {
}
}

func (c *jobContext) cleanStepCtx() {
func (c *jobContext) cleanStepCtx(cause error) {
// reorgTimeoutOccurred indicates whether the current reorg process
// was temporarily exit due to a timeout condition. When set to true,
// it prevents premature cleanup of step context.
if c.reorgTimeoutOccurred {
if cause == context.Canceled && c.reorgTimeoutOccurred {
c.reorgTimeoutOccurred = false // reset flag
return
}
c.stepCtxCancel(context.Canceled)
if c.stepCtxCancel != nil {
c.stepCtxCancel(cause)
}
c.stepCtx = nil // unset stepCtx for the next step initialization
}

Expand Down Expand Up @@ -865,6 +867,14 @@ func (w *worker) runOneJobStep(

// It would be better to do the positive check, but no idea to list all valid states here now.
if job.IsRollingback() {
if jobCtx.stepCtx != nil && jobCtx.stepCtx.Err() == nil {
// If the job switched to rolling back immediately after a reorg step
// timed out, the step context may still be active and hold reorg
// resources (workers, tickers, goroutines). Clean the step context
// explicitly to release those resources and avoid leaks before we
// continue rollback processing.
jobCtx.cleanStepCtx(dbterror.ErrCancelledDDLJob)
}
// when rolling back, we use worker context to process.
jobCtx.stepCtx = w.workCtx
} else {
Expand All @@ -876,7 +886,7 @@ func (w *worker) runOneJobStep(
defer close(stopCheckingJobCancelled)

jobCtx.initStepCtx()
defer jobCtx.cleanStepCtx()
defer jobCtx.cleanStepCtx(context.Canceled)
w.wg.Run(func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ func (w *worker) runReorgJob(
w.mergeWarningsIntoJob(job)

rc.resetWarnings()
failpoint.InjectCall("onRunReorgJobTimeout")
return jobCtx.genReorgTimeoutErr()
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/disttask/framework/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ go_library(
"//pkg/util/memory",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
<<<<<<< HEAD
=======
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_client_go_v2//util",
>>>>>>> 968e31fc3fe (ddl: cancel the job context before rolling back (#64130))
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
8 changes: 8 additions & 0 deletions pkg/disttask/framework/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ import (

"github.com/docker/go-units"
"github.com/pingcap/errors"
<<<<<<< HEAD
=======
"github.com/pingcap/failpoint"
litstorage "github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/config/kerneltype"
>>>>>>> 968e31fc3fe (ddl: cancel the job context before rolling back (#64130))
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/util/backoff"
Expand Down Expand Up @@ -82,6 +88,8 @@ func SubmitTask(ctx context.Context, taskKey string, taskType proto.TaskType, co
return nil, err
}

failpoint.InjectCall("afterDXFTaskSubmitted")

NotifyTaskChange()
return task, nil
}
Expand Down
6 changes: 6 additions & 0 deletions tests/realtikvtest/addindextest4/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@ go_test(
"main_test.go",
],
flaky = True,
<<<<<<< HEAD
=======
shard_count = 8,
>>>>>>> 968e31fc3fe (ddl: cancel the job context before rolling back (#64130))
deps = [
"//pkg/config",
"//pkg/ddl",
"//pkg/disttask/framework/proto",
"//pkg/domain",
"//pkg/errno",
"//pkg/kv",
"//pkg/meta/model",
"//pkg/testkit",
Expand Down
56 changes: 56 additions & 0 deletions tests/realtikvtest/addindextest4/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
Expand Down Expand Up @@ -364,3 +366,57 @@ func TestMultiSchemaChangeAnalyzeOnlyOnce(t *testing.T) {
checkFn("alter table t modify column a bigint, modify column c char(5), modify column b bigint;", "all columns")
checkFn("alter table t modify column a bigint, modify column b bigint;", "") // no lossy change
}

func TestCancelAfterReorgTimeout(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")

tk.MustExec("create view all_global_tasks as select * from mysql.tidb_global_task union all select * from mysql.tidb_global_task_history;")
tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1);")

// Mock subtask executor encounter the same error continuously.
afterMeetErr := false
meetErr := make(chan struct{})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeReadIndexStepExecRunSubtask", func(err *error) {
*err = errors.New("mock err")
if !afterMeetErr {
meetErr <- struct{}{}
afterMeetErr = true
}
})
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/updateProgressIntervalInMs", "return(10)") // Speed up the test.
var jobID int64
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(job *model.Job) {
if job.Type != model.ActionAddIndex {
return
}
jobID = job.ID
})
afterTimeout := false
timeout := make(chan struct{})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/handle/afterDXFTaskSubmitted", func() {
<-meetErr
<-timeout
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test;")
tk1.MustExec(fmt.Sprintf("admin cancel ddl jobs %d;", jobID))
})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onRunReorgJobTimeout", func() {
if !afterTimeout {
timeout <- struct{}{}
afterTimeout = true
}
})
tk.MustGetErrCode("alter table t add index idx(a);", errno.ErrCancelledDDLJob)
require.Eventually(t, func() bool {
result := tk.MustQuery("select state from all_global_tasks;").Rows()
require.Greater(t, len(result), 0)
state := result[0][0].(string)
done := state == proto.TaskStateSucceed.String() ||
state == proto.TaskStateReverted.String() ||
state == proto.TaskStateFailed.String()
return done
}, 10*time.Second, 300*time.Millisecond)
}