@@ -4,41 +4,50 @@ import (
44 "context"
55 "database/sql"
66 "errors"
7- "fmt"
87 "io"
98 "io/ioutil"
109 "net/url"
1110 "regexp"
1211 "strconv"
12+ "time"
1313
14+ "github.com/cenkalti/backoff/v4"
1415 "github.com/golang-migrate/migrate/v4"
1516 "github.com/golang-migrate/migrate/v4/database"
1617 "github.com/hashicorp/go-multierror"
1718 "github.com/lib/pq"
1819 "go.uber.org/atomic"
1920)
2021
21- func init () {
22- db := YugabyteDB {}
23- database .Register ("yugabyte" , & db )
24- database .Register ("yugabytedb" , & db )
25- database .Register ("ysql" , & db )
26- }
27-
28- var DefaultMigrationsTable = "migrations"
29- var DefaultLockTable = "migrations_locks"
22+ const (
23+ DefaultRetryMaxInterval = time .Second * 15
24+ DefaultRetryMaxElapsedTime = time .Second * 30
25+ DefaultRetryMaxRetries = 10
26+ DefaultMigrationsTable = "migrations"
27+ DefaultLockTable = "migrations_locks"
28+ )
3029
3130var (
3231 ErrNilConfig = errors .New ("no config" )
3332 ErrNoDatabaseName = errors .New ("no database name" )
3433 ErrMaxRetriesExceeded = errors .New ("max retries exceeded" )
3534)
3635
36+ func init () {
37+ db := YugabyteDB {}
38+ database .Register ("yugabyte" , & db )
39+ database .Register ("yugabytedb" , & db )
40+ database .Register ("ysql" , & db )
41+ }
42+
3743type Config struct {
38- MigrationsTable string
39- LockTable string
40- ForceLock bool
41- DatabaseName string
44+ MigrationsTable string
45+ LockTable string
46+ ForceLock bool
47+ DatabaseName string
48+ RetryMaxInterval time.Duration
49+ RetryMaxElapsedTime time.Duration
50+ RetryMaxRetries int
4251}
4352
4453type YugabyteDB struct {
@@ -80,6 +89,18 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
8089 config .LockTable = DefaultLockTable
8190 }
8291
92+ if config .RetryMaxInterval == 0 {
93+ config .RetryMaxInterval = DefaultRetryMaxInterval
94+ }
95+
96+ if config .RetryMaxElapsedTime == 0 {
97+ config .RetryMaxElapsedTime = DefaultRetryMaxElapsedTime
98+ }
99+
100+ if config .RetryMaxRetries == 0 {
101+ config .RetryMaxRetries = DefaultRetryMaxRetries
102+ }
103+
83104 px := & YugabyteDB {
84105 db : instance ,
85106 config : config ,
@@ -129,11 +150,32 @@ func (c *YugabyteDB) Open(dbURL string) (database.Driver, error) {
129150 forceLock = false
130151 }
131152
153+ maxIntervalStr := purl .Query ().Get ("x-retry-max-interval" )
154+ maxInterval , err := time .ParseDuration (maxIntervalStr )
155+ if err != nil {
156+ maxInterval = DefaultRetryMaxInterval
157+ }
158+
159+ maxElapsedTimeStr := purl .Query ().Get ("x-retry-max-elapsed-time" )
160+ maxElapsedTime , err := time .ParseDuration (maxElapsedTimeStr )
161+ if err != nil {
162+ maxElapsedTime = DefaultRetryMaxElapsedTime
163+ }
164+
165+ maxRetriesStr := purl .Query ().Get ("x-retry-max-retries" )
166+ maxRetries , err := strconv .Atoi (maxRetriesStr )
167+ if err != nil {
168+ maxRetries = DefaultRetryMaxRetries
169+ }
170+
132171 px , err := WithInstance (db , & Config {
133- DatabaseName : purl .Path ,
134- MigrationsTable : migrationsTable ,
135- LockTable : lockTable ,
136- ForceLock : forceLock ,
172+ DatabaseName : purl .Path ,
173+ MigrationsTable : migrationsTable ,
174+ LockTable : lockTable ,
175+ ForceLock : forceLock ,
176+ RetryMaxInterval : maxInterval ,
177+ RetryMaxElapsedTime : maxElapsedTime ,
178+ RetryMaxRetries : maxRetries ,
137179 })
138180 if err != nil {
139181 return nil , err
@@ -150,7 +192,7 @@ func (c *YugabyteDB) Close() error {
150192// See: https://github.com/yugabyte/yugabyte-db/issues/3642
151193func (c * YugabyteDB ) Lock () error {
152194 return database .CasRestoreOnErr (& c .isLocked , false , true , database .ErrLocked , func () (err error ) {
153- return ExecuteInTx (context .Background (), c . db , & sql.TxOptions {Isolation : sql .LevelSerializable }, func (tx * sql.Tx ) (err error ) {
195+ return c . doTxWithRetry (context .Background (), & sql.TxOptions {Isolation : sql .LevelSerializable }, func (tx * sql.Tx ) (err error ) {
154196 aid , err := database .GenerateAdvisoryLockId (c .config .DatabaseName )
155197 if err != nil {
156198 return err
@@ -228,7 +270,7 @@ func (c *YugabyteDB) Run(migration io.Reader) error {
228270}
229271
230272func (c * YugabyteDB ) SetVersion (version int , dirty bool ) error {
231- return ExecuteInTx (context .Background (), c . db , & sql.TxOptions {Isolation : sql .LevelSerializable }, func (tx * sql.Tx ) error {
273+ return c . doTxWithRetry (context .Background (), & sql.TxOptions {Isolation : sql .LevelSerializable }, func (tx * sql.Tx ) error {
232274 if _ , err := tx .Exec (`DELETE FROM "` + c .config .MigrationsTable + `"` ); err != nil {
233275 return err
234276 }
@@ -363,50 +405,55 @@ func (c *YugabyteDB) ensureLockTable() error {
363405 return nil
364406}
365407
366- func ExecuteInTx (ctx context.Context , db * sql.DB , opts * sql.TxOptions , fn func (tx * sql.Tx ) error ) (err error ) {
367- // It looks like currently, YugabyteDB doesn't return an error on savepoint release.
368- // So, we have to retry the whole transaction.
369- // TODO: do more research on savepoints support, see https://github.com/yugabyte/yugabyte-db/issues/9219
408+ func (c * YugabyteDB ) doTxWithRetry (
409+ ctx context.Context ,
410+ txOpts * sql.TxOptions ,
411+ fn func (tx * sql.Tx ) error ,
412+ ) error {
413+ backOff := c .newBackoff (ctx )
370414
371- const maxRetries = 50
372- retryCount := 0
373-
374- for {
375- if retryCount - 1 >= maxRetries {
376- return fmt .Errorf ("%w: reached %d attempts" , ErrMaxRetriesExceeded , retryCount - 1 )
377- }
378-
379- tx , err := db .BeginTx (ctx , opts )
415+ return backoff .Retry (func () error {
416+ tx , err := c .db .BeginTx (ctx , txOpts )
380417 if err != nil {
381- return err
418+ return backoff . Permanent ( err )
382419 }
383420
384- if err := fn (tx ); err != nil {
385- _ = tx .Rollback ()
386-
387- if ! errIsRetryable (err ) {
388- return err
389- }
421+ defer tx .Rollback ()
390422
391- retryCount ++
423+ if err := fn (tx ); err != nil && ! errIsRetryable (err ) {
424+ return backoff .Permanent (err )
425+ } else if err != nil {
426+ return err
427+ }
392428
393- continue
429+ if err := tx .Commit (); err != nil && ! errIsRetryable (err ) {
430+ return backoff .Permanent (err )
431+ } else if err != nil {
432+ return err
394433 }
395434
396- if err := tx .Commit (); err != nil {
397- _ = tx .Rollback ()
435+ return nil
436+ }, backOff )
437+ }
398438
399- if ! errIsRetryable (err ) {
400- return err
401- }
439+ func (c * YugabyteDB ) newBackoff (ctx context.Context ) backoff.BackOff {
440+ if ctx == nil {
441+ ctx = context .Background ()
442+ }
402443
403- retryCount ++
444+ retrier := backoff .WithMaxRetries (backoff .WithContext (& backoff.ExponentialBackOff {
445+ InitialInterval : backoff .DefaultInitialInterval ,
446+ RandomizationFactor : backoff .DefaultRandomizationFactor ,
447+ Multiplier : backoff .DefaultMultiplier ,
448+ MaxInterval : c .config .RetryMaxInterval ,
449+ MaxElapsedTime : c .config .RetryMaxElapsedTime ,
450+ Stop : backoff .Stop ,
451+ Clock : backoff .SystemClock ,
452+ }, ctx ), uint64 (c .config .RetryMaxRetries ))
404453
405- continue
406- }
454+ retrier .Reset ()
407455
408- return nil
409- }
456+ return retrier
410457}
411458
412459func errIsRetryable (err error ) bool {
0 commit comments