Skip to content

Commit 1557a95

Browse files
authored
Resume from checkpoint (#1595)
* add Checkpoint table and read/write funcs * handle no checkpoints returned * store min and max range values in checkpoint * resume from checkpoint * add checkpoint file * fix unique key args * update applier coordinates from _ghc heartbeat * fix test * fix linter * make checkpoint interval configurable * write checkpoint iteration number * store rows copied & dml applied * truncate column name if necessary * drop checkpoint table for final cleanup * add docs * add resume doc
1 parent 0284a97 commit 1557a95

File tree

16 files changed

+696
-114
lines changed

16 files changed

+696
-114
lines changed

‎doc/command-line-flags.md‎

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,15 @@ It is not reliable to parse the `ALTER` statement to determine if it is instant
6464
### binlogsyncer-max-reconnect-attempts
6565
`--binlogsyncer-max-reconnect-attempts=0`, the maximum number of attempts to re-establish a broken inspector connection for sync binlog. `0` or `negative number` means infinite retry, default `0`
6666

67+
### checkpoint
68+
69+
`--checkpoint` enables periodic checkpoints of the gh-ost's state so that gh-ost can resume a migration from the checkpoint with `--resume`. Checkpoints are written to a separate table named `_${original_table_name}_ghk`. It is recommended to use with `--gtid` for checkpoints.
70+
See also: [`resuming-migrations`](resume.md)
71+
72+
### checkpoint-seconds
73+
74+
`--checkpoint-seconds` specifies the seconds between checkpoints. Default is 300.
75+
6776
### conf
6877

6978
`--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format:
@@ -226,6 +235,11 @@ Optionally involve the process ID, for example: `--replica-server-id=$((10000000
226235
It's on you to choose a number that does not collide with another `gh-ost` or another running replica.
227236
See also: [`concurrent-migrations`](cheatsheet.md#concurrent-migrations) on the cheatsheet.
228237

238+
### resume
239+
240+
`--resume` attempts to resume a migration that was previously interrupted from the last checkpoint. The first `gh-ost` invocation must run with `--checkpoint` and have successfully written a checkpoint in order for `--resume` to work.
241+
See also: [`resuming-migrations`](resume.md)
242+
229243
### serve-socket-file
230244

231245
Defaults to an auto-determined and advertised upon startup file. Defines Unix socket file to serve on.

‎doc/resume.md‎

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Resuming Migrations
2+
3+
`gh-ost` can attempt to resume an interrupted migration from a checkpoint if the following conditions are met:
4+
- The first `gh-ost` process was invoked with `--checkpoint`
5+
- The first `gh-ost` process had at least one successful checkpoint
6+
- The binlogs from the last checkpoint's binlog coordinates still exist on the replica gh-ost is inspecting (specified by `--host`)
7+
8+
To resume, invoke `gh-ost` again with the same arguments with the `--resume` flag.
9+
10+
> [!WARNING]
11+
> It is recommended use `--checkpoint` with `--gtid` enabled so that checkpoint binlog coordinates store GTID sets rather than file positions. In that case, `gh-ost` can resume using a different replica than it originally attached to.
12+
13+
## Example
14+
The migration starts with a `gh-ost` invocation such as:
15+
```shell
16+
gh-ost \
17+
--chunk-size=100 \
18+
--host=replica1.company.com \
19+
--database="mydb" \
20+
--table="mytable" \
21+
--alter="add column mycol varchar(20)"
22+
--gtid \
23+
--checkpoint \
24+
--checkpoint-seconds=60 \
25+
--execute
26+
```
27+
28+
In this example `gh-ost` writes a checkpoint to a table `_mytable_ghk` every 60 seconds. After `gh-ost` is interrupted/killed, the migration can be resumed with:
29+
```shell
30+
# resume migration
31+
gh-ost \
32+
--chunk-size=100
33+
--host=replica1.company.com \
34+
--database="mydb" \
35+
--table="mytable" \
36+
--alter="add column mycol varchar(20)"
37+
--gtid \
38+
--resume \
39+
--execute
40+
```
41+
42+
`gh-ost` then reconnects at the binlog coordinates of the last checkpoint and resumes copying rows at the chunk specified by the checkpoint. The data integrity of the ghost table is preserved because `gh-ost` applies row DMLs and copies row in an idempotent way.

‎go/base/context.go‎

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ type MigrationContext struct {
103103
GoogleCloudPlatform bool
104104
AzureMySQL bool
105105
AttemptInstantDDL bool
106+
Resume bool
106107

107108
// SkipPortValidation allows skipping the port validation in `ValidateConnection`
108109
// This is useful when connecting to a MySQL instance where the external port
@@ -153,6 +154,8 @@ type MigrationContext struct {
153154
HooksHintToken string
154155
HooksStatusIntervalSec int64
155156
PanicOnWarnings bool
157+
Checkpoint bool
158+
CheckpointIntervalSeconds int64
156159

157160
DropServeSocket bool
158161
ServeSocketFile string
@@ -239,6 +242,7 @@ type MigrationContext struct {
239242
Iteration int64
240243
MigrationIterationRangeMinValues *sql.ColumnValues
241244
MigrationIterationRangeMaxValues *sql.ColumnValues
245+
InitialStreamerCoords mysql.BinlogCoordinates
242246
ForceTmpTableName string
243247

244248
IncludeTriggers bool
@@ -380,6 +384,15 @@ func (this *MigrationContext) GetChangelogTableName() string {
380384
}
381385
}
382386

387+
// GetCheckpointTableName generates the name of checkpoint table.
388+
func (this *MigrationContext) GetCheckpointTableName() string {
389+
if this.ForceTmpTableName != "" {
390+
return getSafeTableName(this.ForceTmpTableName, "ghk")
391+
} else {
392+
return getSafeTableName(this.OriginalTableName, "ghk")
393+
}
394+
}
395+
383396
// GetVoluntaryLockName returns a name of a voluntary lock to be used throughout
384397
// the swap-tables process.
385398
func (this *MigrationContext) GetVoluntaryLockName() string {

‎go/cmd/gh-ost/main.go‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ func main() {
145145
flag.StringVar(&migrationContext.TriggerSuffix, "trigger-suffix", "", "Add a suffix to the trigger name (i.e '_v2'). Requires '--include-triggers'")
146146
flag.BoolVar(&migrationContext.RemoveTriggerSuffix, "remove-trigger-suffix-if-exists", false, "Remove given suffix from name of trigger. Requires '--include-triggers' and '--trigger-suffix'")
147147
flag.BoolVar(&migrationContext.SkipPortValidation, "skip-port-validation", false, "Skip port validation for MySQL connections")
148+
flag.BoolVar(&migrationContext.Checkpoint, "checkpoint", false, "Enable migration checkpoints")
149+
flag.Int64Var(&migrationContext.CheckpointIntervalSeconds, "checkpoint-seconds", 300, "The number of seconds between checkpoints")
150+
flag.BoolVar(&migrationContext.Resume, "resume", false, "Attempt to resume migration from checkpoint")
148151

149152
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes")
150153
criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as --max-load. When status exceeds threshold, app panics and quits")
@@ -284,6 +287,9 @@ func main() {
284287
if *storageEngine == "rocksdb" {
285288
migrationContext.Log.Warning("RocksDB storage engine support is experimental")
286289
}
290+
if migrationContext.CheckpointIntervalSeconds < 10 {
291+
migrationContext.Log.Fatalf("--checkpoint-seconds should be >=10")
292+
}
287293

288294
switch *cutOver {
289295
case "atomic", "default", "":
@@ -316,6 +322,7 @@ func main() {
316322
}
317323
migrationContext.CliPassword = string(bytePassword)
318324
}
325+
319326
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
320327
migrationContext.SetNiceRatio(*niceRatio)
321328
migrationContext.SetChunkSize(*chunkSize)

‎go/logic/applier.go‎

Lines changed: 142 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Copyright 2021 GitHub Inc.
2+
Copyright 2025 GitHub Inc.
33
See https://github.com/github/gh-ost/blob/master/LICENSE
44
*/
55

@@ -21,6 +21,9 @@ import (
2121
"context"
2222
"database/sql/driver"
2323

24+
"errors"
25+
"sync"
26+
2427
"github.com/github/gh-ost/go/mysql"
2528
drivermysql "github.com/go-sql-driver/mysql"
2629
"github.com/openark/golib/sqlutils"
@@ -31,6 +34,9 @@ const (
3134
atomicCutOverMagicHint = "ghost-cut-over-sentry"
3235
)
3336

37+
// ErrNoCheckpointFound is returned when an empty checkpoint table is queried.
38+
var ErrNoCheckpointFound = errors.New("no checkpoint found in _ghk table")
39+
3440
type dmlBuildResult struct {
3541
query string
3642
args []interface{}
@@ -66,9 +72,17 @@ type Applier struct {
6672
finishedMigrating int64
6773
name string
6874

69-
dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder
70-
dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder
71-
dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder
75+
CurrentCoordinatesMutex sync.Mutex
76+
CurrentCoordinates mysql.BinlogCoordinates
77+
78+
LastIterationRangeMutex sync.Mutex
79+
LastIterationRangeMinValues *sql.ColumnValues
80+
LastIterationRangeMaxValues *sql.ColumnValues
81+
82+
dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder
83+
dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder
84+
dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder
85+
checkpointInsertQueryBuilder *sql.CheckpointInsertQueryBuilder
7286
}
7387

7488
func NewApplier(migrationContext *base.MigrationContext) *Applier {
@@ -144,6 +158,15 @@ func (this *Applier) prepareQueries() (err error) {
144158
); err != nil {
145159
return err
146160
}
161+
if this.migrationContext.Checkpoint {
162+
if this.checkpointInsertQueryBuilder, err = sql.NewCheckpointQueryBuilder(
163+
this.migrationContext.DatabaseName,
164+
this.migrationContext.GetCheckpointTableName(),
165+
&this.migrationContext.UniqueKey.Columns,
166+
); err != nil {
167+
return err
168+
}
169+
}
147170
return nil
148171
}
149172

@@ -400,6 +423,54 @@ func (this *Applier) CreateChangelogTable() error {
400423
return nil
401424
}
402425

426+
// Create the checkpoint table to store the chunk copy and applier state.
427+
// There are two sets of columns with the same types as the shared unique key,
428+
// one for IterationMinValues and one for IterationMaxValues.
429+
func (this *Applier) CreateCheckpointTable() error {
430+
if err := this.DropCheckpointTable(); err != nil {
431+
return err
432+
}
433+
colDefs := []string{
434+
"`gh_ost_chk_id` bigint auto_increment primary key",
435+
"`gh_ost_chk_timestamp` bigint",
436+
"`gh_ost_chk_coords` varchar(4096)",
437+
"`gh_ost_chk_iteration` bigint",
438+
"`gh_ost_rows_copied` bigint",
439+
"`gh_ost_dml_applied` bigint",
440+
}
441+
for _, col := range this.migrationContext.UniqueKey.Columns.Columns() {
442+
if col.MySQLType == "" {
443+
return fmt.Errorf("CreateCheckpoinTable: column %s has no type information. applyColumnTypes must be called", sql.EscapeName(col.Name))
444+
}
445+
minColName := sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4) + "_min"
446+
colDef := fmt.Sprintf("%s %s", sql.EscapeName(minColName), col.MySQLType)
447+
if !col.Nullable {
448+
colDef += " NOT NULL"
449+
}
450+
colDefs = append(colDefs, colDef)
451+
}
452+
453+
for _, col := range this.migrationContext.UniqueKey.Columns.Columns() {
454+
maxColName := sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4) + "_max"
455+
colDef := fmt.Sprintf("%s %s", sql.EscapeName(maxColName), col.MySQLType)
456+
if !col.Nullable {
457+
colDef += " NOT NULL"
458+
}
459+
colDefs = append(colDefs, colDef)
460+
}
461+
462+
query := fmt.Sprintf("create /* gh-ost */ table %s.%s (\n %s\n)",
463+
sql.EscapeName(this.migrationContext.DatabaseName),
464+
sql.EscapeName(this.migrationContext.GetCheckpointTableName()),
465+
strings.Join(colDefs, ",\n "),
466+
)
467+
this.migrationContext.Log.Infof("Created checkpoint table")
468+
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
469+
return err
470+
}
471+
return nil
472+
}
473+
403474
// dropTable drops a given table on the applied host
404475
func (this *Applier) dropTable(tableName string) error {
405476
query := fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`,
@@ -494,6 +565,11 @@ func (this *Applier) DropChangelogTable() error {
494565
return this.dropTable(this.migrationContext.GetChangelogTableName())
495566
}
496567

568+
// DropCheckpointTable drops the checkpoint table on applier host
569+
func (this *Applier) DropCheckpointTable() error {
570+
return this.dropTable(this.migrationContext.GetCheckpointTableName())
571+
}
572+
497573
// DropOldTable drops the _Old table on the applier host
498574
func (this *Applier) DropOldTable() error {
499575
return this.dropTable(this.migrationContext.GetOldTableName())
@@ -542,6 +618,60 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
542618
return this.WriteAndLogChangelog("state", value)
543619
}
544620

621+
// WriteCheckpoints writes a checkpoint to the _ghk table.
622+
func (this *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) {
623+
var insertId int64
624+
uniqueKeyArgs := sqlutils.Args(chk.IterationRangeMin.AbstractValues()...)
625+
uniqueKeyArgs = append(uniqueKeyArgs, chk.IterationRangeMax.AbstractValues()...)
626+
query, uniqueKeyArgs, err := this.checkpointInsertQueryBuilder.BuildQuery(uniqueKeyArgs)
627+
if err != nil {
628+
return insertId, err
629+
}
630+
args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration, chk.RowsCopied, chk.DMLApplied)
631+
args = append(args, uniqueKeyArgs...)
632+
res, err := this.db.Exec(query, args...)
633+
if err != nil {
634+
return insertId, err
635+
}
636+
return res.LastInsertId()
637+
}
638+
639+
func (this *Applier) ReadLastCheckpoint() (*Checkpoint, error) {
640+
row := this.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1`, this.migrationContext.DatabaseName, this.migrationContext.GetCheckpointTableName()))
641+
chk := &Checkpoint{
642+
IterationRangeMin: sql.NewColumnValues(this.migrationContext.UniqueKey.Columns.Len()),
643+
IterationRangeMax: sql.NewColumnValues(this.migrationContext.UniqueKey.Columns.Len()),
644+
}
645+
646+
var coordStr string
647+
var timestamp int64
648+
ptrs := []interface{}{&chk.Id, &timestamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied}
649+
ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...)
650+
ptrs = append(ptrs, chk.IterationRangeMax.ValuesPointers...)
651+
err := row.Scan(ptrs...)
652+
if err != nil {
653+
if errors.Is(err, gosql.ErrNoRows) {
654+
return nil, ErrNoCheckpointFound
655+
}
656+
return nil, err
657+
}
658+
chk.Timestamp = time.Unix(timestamp, 0)
659+
if this.migrationContext.UseGTIDs {
660+
gtidCoords, err := mysql.NewGTIDBinlogCoordinates(coordStr)
661+
if err != nil {
662+
return nil, err
663+
}
664+
chk.LastTrxCoords = gtidCoords
665+
} else {
666+
fileCoords, err := mysql.ParseFileBinlogCoordinates(coordStr)
667+
if err != nil {
668+
return nil, err
669+
}
670+
chk.LastTrxCoords = fileCoords
671+
}
672+
return chk, nil
673+
}
674+
545675
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
546676
// This is done asynchronously
547677
func (this *Applier) InitiateHeartbeat() {
@@ -686,8 +816,15 @@ func (this *Applier) ReadMigrationRangeValues() error {
686816
// CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values,
687817
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
688818
// no further chunk to work through, i.e. we're past the last chunk and are done with
689-
// iterating the range (and this done with copying row chunks)
819+
// iterating the range (and thus done with copying row chunks)
690820
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
821+
this.LastIterationRangeMutex.Lock()
822+
if this.migrationContext.MigrationIterationRangeMinValues != nil && this.migrationContext.MigrationIterationRangeMaxValues != nil {
823+
this.LastIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMinValues.Clone()
824+
this.LastIterationRangeMaxValues = this.migrationContext.MigrationIterationRangeMaxValues.Clone()
825+
}
826+
this.LastIterationRangeMutex.Unlock()
827+
691828
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
692829
if this.migrationContext.MigrationIterationRangeMinValues == nil {
693830
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues

0 commit comments

Comments
 (0)