66 "bytes"
77 "context"
88 "fmt"
9- "strings"
109 "sync"
1110 "testing"
1211 "time"
@@ -498,6 +497,85 @@ func TestEnableCheckPointLimit(t *testing.T) {
498497 }
499498}
500499
500+ func TestOwnerChangeCheckPointLagged (t * testing.T ) {
501+ c := createFakeCluster (t , 4 , false )
502+ defer func () {
503+ fmt .Println (c )
504+ }()
505+ c .splitAndScatter ("01" , "02" , "022" , "023" , "033" , "04" , "043" )
506+ ctx , cancel := context .WithCancel (context .Background ())
507+ defer cancel ()
508+
509+ env := newTestEnv (c , t )
510+ rngs := env .ranges
511+ if len (rngs ) == 0 {
512+ rngs = []kv.KeyRange {{}}
513+ }
514+ env .task = streamhelper.TaskEvent {
515+ Type : streamhelper .EventAdd ,
516+ Name : "whole" ,
517+ Info : & backup.StreamBackupTaskInfo {
518+ Name : "whole" ,
519+ StartTs : oracle .GoTimeToTS (oracle .GetTimeFromTS (0 ).Add (1 * time .Minute )),
520+ },
521+ Ranges : rngs ,
522+ }
523+
524+ adv := streamhelper .NewCheckpointAdvancer (env )
525+ adv .UpdateConfigWith (func (c * config.Config ) {
526+ c .CheckPointLagLimit = 1 * time .Minute
527+ })
528+ ctx1 , cancel1 := context .WithCancel (context .Background ())
529+ adv .OnStart (ctx1 )
530+ adv .OnBecomeOwner (ctx1 )
531+ log .Info ("advancer1 become owner" )
532+ require .NoError (t , adv .OnTick (ctx1 ))
533+
534+ // another advancer but never advance checkpoint before
535+ adv2 := streamhelper .NewCheckpointAdvancer (env )
536+ adv2 .UpdateConfigWith (func (c * config.Config ) {
537+ c .CheckPointLagLimit = 1 * time .Minute
538+ })
539+ ctx2 , cancel2 := context .WithCancel (context .Background ())
540+ adv2 .OnStart (ctx2 )
541+
542+ for i := 0 ; i < 5 ; i ++ {
543+ c .advanceClusterTimeBy (2 * time .Minute )
544+ c .advanceCheckpointBy (2 * time .Minute )
545+ require .NoError (t , adv .OnTick (ctx1 ))
546+ }
547+ c .advanceClusterTimeBy (2 * time .Minute )
548+ require .ErrorContains (t , adv .OnTick (ctx1 ), "lagged too large" )
549+
550+ // resume task to make next tick normally
551+ c .advanceCheckpointBy (2 * time .Minute )
552+ env .ResumeTask (ctx )
553+
554+ // stop advancer1, and advancer2 should take over
555+ cancel1 ()
556+ log .Info ("advancer1 owner canceled, and advancer2 become owner" )
557+ adv2 .OnBecomeOwner (ctx2 )
558+ require .NoError (t , adv2 .OnTick (ctx2 ))
559+
560+ // advancer2 should take over and tick normally
561+ for i := 0 ; i < 10 ; i ++ {
562+ c .advanceClusterTimeBy (2 * time .Minute )
563+ c .advanceCheckpointBy (2 * time .Minute )
564+ require .NoError (t , adv2 .OnTick (ctx2 ))
565+ }
566+ c .advanceClusterTimeBy (2 * time .Minute )
567+ require .ErrorContains (t , adv2 .OnTick (ctx2 ), "lagged too large" )
568+ // stop advancer2, and advancer1 should take over
569+ c .advanceCheckpointBy (2 * time .Minute )
570+ env .ResumeTask (ctx )
571+ cancel2 ()
572+ log .Info ("advancer2 owner canceled, and advancer1 become owner" )
573+
574+ adv .OnBecomeOwner (ctx )
575+ // advancer1 should take over and tick normally when come back
576+ require .NoError (t , adv .OnTick (ctx ))
577+ }
578+
501579func TestCheckPointLagged (t * testing.T ) {
502580 c := createFakeCluster (t , 4 , false )
503581 defer func () {
@@ -528,8 +606,10 @@ func TestCheckPointLagged(t *testing.T) {
528606 })
529607 adv .StartTaskListener (ctx )
530608 c .advanceClusterTimeBy (2 * time .Minute )
609+ // if global ts is not advanced, the checkpoint will not be lagged
610+ c .advanceCheckpointBy (2 * time .Minute )
531611 require .NoError (t , adv .OnTick (ctx ))
532- c .advanceClusterTimeBy (1 * time .Minute )
612+ c .advanceClusterTimeBy (3 * time .Minute )
533613 require .ErrorContains (t , adv .OnTick (ctx ), "lagged too large" )
534614 // after some times, the isPaused will be set and ticks are skipped
535615 require .Eventually (t , func () bool {
@@ -553,8 +633,10 @@ func TestCheckPointResume(t *testing.T) {
553633 })
554634 adv .StartTaskListener (ctx )
555635 c .advanceClusterTimeBy (1 * time .Minute )
636+ // if global ts is not advanced, the checkpoint will not be lagged
637+ c .advanceCheckpointBy (1 * time .Minute )
556638 require .NoError (t , adv .OnTick (ctx ))
557- c .advanceClusterTimeBy (1 * time .Minute )
639+ c .advanceClusterTimeBy (2 * time .Minute )
558640 require .ErrorContains (t , adv .OnTick (ctx ), "lagged too large" )
559641 require .Eventually (t , func () bool {
560642 return assert .NoError (t , adv .OnTick (ctx ))
@@ -584,18 +666,48 @@ func TestUnregisterAfterPause(t *testing.T) {
584666 c .CheckPointLagLimit = 1 * time .Minute
585667 })
586668 adv .StartTaskListener (ctx )
669+
670+ // wait for the task to be added
671+ require .Eventually (t , func () bool {
672+ return adv .HasTask ()
673+ }, 5 * time .Second , 100 * time .Millisecond )
674+
675+ // task is should be paused when global checkpoint is laggeod
676+ // even the global checkpoint is equal to task start ts(not advanced all the time)
587677 c .advanceClusterTimeBy (1 * time .Minute )
588678 require .NoError (t , adv .OnTick (ctx ))
589679 env .PauseTask (ctx , "whole" )
590- time .Sleep (1 * time .Second )
591680 c .advanceClusterTimeBy (1 * time .Minute )
681+ require .Error (t , adv .OnTick (ctx ), "checkpoint is lagged" )
682+ env .unregisterTask ()
683+ env .putTask ()
684+
685+ // wait for the task to be added
686+ require .Eventually (t , func () bool {
687+ return adv .HasTask ()
688+ }, 5 * time .Second , 100 * time .Millisecond )
689+
690+ require .Error (t , adv .OnTick (ctx ), "checkpoint is lagged" )
691+
692+ env .unregisterTask ()
693+ // wait for the task to be deleted
694+ require .Eventually (t , func () bool {
695+ return ! adv .HasTask ()
696+ }, 5 * time .Second , 100 * time .Millisecond )
697+
698+ // reset
699+ c .advanceClusterTimeBy (- 1 * time .Minute )
592700 require .NoError (t , adv .OnTick (ctx ))
701+ env .PauseTask (ctx , "whole" )
702+ c .advanceClusterTimeBy (1 * time .Minute )
593703 env .unregisterTask ()
594704 env .putTask ()
705+ // wait for the task to be add
595706 require .Eventually (t , func () bool {
596- err := adv .OnTick (ctx )
597- return err != nil && strings .Contains (err .Error (), "check point lagged too large" )
598- }, 5 * time .Second , 300 * time .Millisecond )
707+ return adv .HasTask ()
708+ }, 5 * time .Second , 100 * time .Millisecond )
709+
710+ require .Error (t , adv .OnTick (ctx ), "checkpoint is lagged" )
599711}
600712
601713// If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally.
@@ -707,13 +819,18 @@ func TestAddTaskWithLongRunTask2(t *testing.T) {
707819 adv .UpdateConfigWith (func (c * config.Config ) {
708820 c .CheckPointLagLimit = 1 * time .Minute
709821 })
822+ adv .StartTaskListener (ctx )
710823 c .advanceClusterTimeBy (3 * time .Minute )
711824 c .advanceCheckpointBy (1 * time .Minute )
712825 env .advanceCheckpointBy (2 * time .Minute )
713826 env .mockPDConnectionError ()
714- adv .StartTaskListener (ctx )
715- // Try update checkpoint
716- require .NoError (t , adv .OnTick (ctx ))
827+ // if cannot connect to pd, the checkpoint will be rolled back
828+ // because at this point. the global ts is 2 minutes
829+ // and the local checkpoint ts is 1 minute
830+ require .Error (t , adv .OnTick (ctx ), "checkpoint rollback" )
831+
832+ // only when local checkpoint > global ts, the next tick will be normal
833+ c .advanceCheckpointBy (12 * time .Minute )
717834 // Verify no err raised
718835 require .NoError (t , adv .OnTick (ctx ))
719836}
@@ -747,11 +864,17 @@ func TestAddTaskWithLongRunTask3(t *testing.T) {
747864 adv .UpdateConfigWith (func (c * config.Config ) {
748865 c .CheckPointLagLimit = 1 * time .Minute
749866 })
750- c .advanceClusterTimeBy (3 * time .Minute )
867+ // advance cluster time to 4 minutes, and checkpoint to 1 minutes
868+ // if start ts equals to checkpoint, the task will not be paused
869+ adv .StartTaskListener (ctx )
870+ c .advanceClusterTimeBy (2 * time .Minute )
871+ c .advanceCheckpointBy (1 * time .Minute )
872+ env .advanceCheckpointBy (1 * time .Minute )
873+ require .NoError (t , adv .OnTick (ctx ))
874+
875+ c .advanceClusterTimeBy (2 * time .Minute )
751876 c .advanceCheckpointBy (1 * time .Minute )
752877 env .advanceCheckpointBy (1 * time .Minute )
753- env .mockPDConnectionError ()
754- adv .StartTaskListener (ctx )
755878 // Try update checkpoint
756879 require .ErrorContains (t , adv .OnTick (ctx ), "lagged too large" )
757880 // Verify no err raised after paused
0 commit comments