Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ type MigrationContext struct {
CutOverType CutOver
ReplicaServerId uint

// Number of workers used by the trx coordinator
NumWorkers int

Hostname string
AssumeMasterHostname string
ApplierTimeZone string
Expand Down
110 changes: 30 additions & 80 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
Copyright 2022 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
See https://github.com/github/gh-ost/blob/master/LICENSE
*/

package binlog
Expand All @@ -11,7 +11,6 @@ import (

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"

"time"

Expand Down Expand Up @@ -85,59 +84,17 @@ func (this *GoMySQLReader) GetCurrentBinlogCoordinates() mysql.BinlogCoordinates
return this.currentCoordinates.Clone()
}

func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
currentCoords := this.GetCurrentBinlogCoordinates()
dml := ToEventDML(ev.Header.EventType.String())
if dml == NotDML {
return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String())
}
for i, row := range rowsEvent.Rows {
if dml == UpdateDML && i%2 == 1 {
// An update has two rows (WHERE+SET)
// We do both at the same time
continue
}
binlogEntry := NewBinlogEntryAt(currentCoords)
binlogEntry.DmlEvent = NewBinlogDMLEvent(
string(rowsEvent.Table.Schema),
string(rowsEvent.Table.Table),
dml,
)
switch dml {
case InsertDML:
{
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row)
}
case UpdateDML:
{
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1])
}
case DeleteDML:
{
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
}
}

// The channel will do the throttling. Whoever is reading from the channel
// decides whether action is taken synchronously (meaning we wait before
// next iteration) or asynchronously (we keep pushing more events)
// In reality, reads will be synchronous
entriesChannel <- binlogEntry
}
return nil
}

// StreamEvents
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
if canStopStreaming() {
return nil
}
// StreamEvents reads binlog events and sends them to the given channel.
// It is blocking and should be executed in a goroutine.
func (this *GoMySQLReader) StreamEvents(ctx context.Context, canStopStreaming func() bool, eventChannel chan<- *replication.BinlogEvent) error {
for {
if canStopStreaming() {
break
return nil
}
if err := ctx.Err(); err != nil {
return err
}
ev, err := this.binlogStreamer.GetEvent(context.Background())
ev, err := this.binlogStreamer.GetEvent(ctx)
if err != nil {
return err
}
Expand All @@ -159,45 +116,38 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha

switch event := ev.Event.(type) {
case *replication.GTIDEvent:
if !this.migrationContext.UseGTIDs {
continue
}
sid, err := uuid.FromBytes(event.SID)
if err != nil {
return err
}
this.currentCoordinatesMutex.Lock()
if this.LastTrxCoords != nil {
this.currentCoordinates = this.LastTrxCoords.Clone()
if this.migrationContext.UseGTIDs {
sid, err := uuid.FromBytes(event.SID)
if err != nil {
return err
}
this.currentCoordinatesMutex.Lock()
if this.LastTrxCoords != nil {
this.currentCoordinates = this.LastTrxCoords.Clone()
}
coords := this.currentCoordinates.(*mysql.GTIDBinlogCoordinates)
trxGset := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: event.GNO, Stop: event.GNO + 1})
coords.GTIDSet.AddSet(trxGset)
this.currentCoordinatesMutex.Unlock()
}
coords := this.currentCoordinates.(*mysql.GTIDBinlogCoordinates)
trxGset := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: event.GNO, Stop: event.GNO + 1})
coords.GTIDSet.AddSet(trxGset)
this.currentCoordinatesMutex.Unlock()
case *replication.RotateEvent:
if this.migrationContext.UseGTIDs {
continue
if !this.migrationContext.UseGTIDs {
this.currentCoordinatesMutex.Lock()
coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates)
coords.LogFile = string(event.NextLogName)
this.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", coords.LogFile, int64(ev.Header.LogPos), event.NextLogName)
this.currentCoordinatesMutex.Unlock()
}
this.currentCoordinatesMutex.Lock()
coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates)
coords.LogFile = string(event.NextLogName)
this.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", coords.LogFile, int64(ev.Header.LogPos), event.NextLogName)
this.currentCoordinatesMutex.Unlock()
case *replication.XIDEvent:
if this.migrationContext.UseGTIDs {
this.LastTrxCoords = &mysql.GTIDBinlogCoordinates{GTIDSet: event.GSet.(*gomysql.MysqlGTIDSet)}
} else {
this.LastTrxCoords = this.currentCoordinates.Clone()
}
case *replication.RowsEvent:
if err := this.handleRowsEvent(ev, event, entriesChannel); err != nil {
return err
}
}
}
this.migrationContext.Log.Debugf("done streaming events")

return nil
eventChannel <- ev
}
}

func (this *GoMySQLReader) Close() error {
Expand Down
1 change: 1 addition & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func main() {
flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout) or attempting instant DDL")
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")
flag.IntVar(&migrationContext.NumWorkers, "workers", 8, "Number of concurrent workers for applying DML events. Each worker uses one goroutine.")

maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation")
replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query")
Expand Down
4 changes: 3 additions & 1 deletion go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,14 @@ func (this *Applier) compileMigrationKeyWarningRegex() (*regexp.Regexp, error) {
return migrationKeyRegex, nil
}

func (this *Applier) InitDBConnections() (err error) {
func (this *Applier) InitDBConnections(maxConns int) (err error) {
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
uriWithMulti := fmt.Sprintf("%s&multiStatements=true", applierUri)
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, uriWithMulti); err != nil {
return err
}
this.db.SetMaxOpenConns(maxConns)
this.db.SetMaxIdleConns(maxConns)
singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri)
if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil {
return err
Expand Down
34 changes: 17 additions & 17 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (suite *ApplierTestSuite) TestInitDBConnections() {
applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

mysqlVersion, _ := strings.CutPrefix(testMysqlContainerImage, "mysql:")
Expand Down Expand Up @@ -374,7 +374,7 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

dmlEvents := []*binlog.BinlogDMLEvent{
Expand Down Expand Up @@ -431,7 +431,7 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() {
applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

err = applier.ValidateOrDropExistingTables()
Expand Down Expand Up @@ -463,7 +463,7 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTablesWithGhostTableExi
applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

err = applier.ValidateOrDropExistingTables()
Expand Down Expand Up @@ -494,7 +494,7 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTablesWithGhostTableExi
applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

err = applier.ValidateOrDropExistingTables()
Expand Down Expand Up @@ -532,7 +532,7 @@ func (suite *ApplierTestSuite) TestCreateGhostTable() {
applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

err = applier.CreateGhostTable()
Expand Down Expand Up @@ -586,7 +586,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, item_id) VALUES (123456, 42);", getTestTableName()))
Expand Down Expand Up @@ -676,7 +676,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai
}
applier := NewApplier(migrationContext)

err = applier.InitDBConnections()
err = applier.InitDBConnections(1)
suite.Require().NoError(err)

err = applier.CreateChangelogTable()
Expand Down Expand Up @@ -743,7 +743,7 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() {

applier := NewApplier(migrationContext)

err = applier.InitDBConnections()
err = applier.InitDBConnections(1)
suite.Require().NoError(err)

err = applier.CreateChangelogTable()
Expand Down Expand Up @@ -825,7 +825,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateKeyOnNonMigration
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(1)
suite.Require().NoError(err)

// Insert initial rows into ghost table (simulating bulk copy phase)
Expand Down Expand Up @@ -914,7 +914,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateCompositeUniqueKe
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(1)
suite.Require().NoError(err)

// Insert initial rows into ghost table (simulating bulk copy phase)
Expand Down Expand Up @@ -1016,7 +1016,7 @@ func (suite *ApplierTestSuite) TestUpdateModifyingUniqueKeyWithDuplicateOnOtherI
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(1)
suite.Require().NoError(err)

// Setup: Insert initial rows into ghost table
Expand Down Expand Up @@ -1111,7 +1111,7 @@ func (suite *ApplierTestSuite) TestNormalUpdateWithPanicOnWarnings() {
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(1)
suite.Require().NoError(err)

// Setup: Insert initial rows into ghost table
Expand Down Expand Up @@ -1191,7 +1191,7 @@ func (suite *ApplierTestSuite) TestDuplicateOnMigrationKeyAllowedInBinlogReplay(
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(1)
suite.Require().NoError(err)

// Insert initial rows into ghost table (simulating bulk copy phase)
Expand Down Expand Up @@ -1282,7 +1282,7 @@ func (suite *ApplierTestSuite) TestRegexMetacharactersInIndexName() {
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(1)
suite.Require().NoError(err)

// Insert initial rows
Expand Down Expand Up @@ -1384,7 +1384,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsDisabled() {
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(1)
suite.Require().NoError(err)

// Insert initial rows into ghost table
Expand Down Expand Up @@ -1473,7 +1473,7 @@ func (suite *ApplierTestSuite) TestMultipleDMLEventsInBatch() {
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(1)
suite.Require().NoError(err)

// Insert initial rows into ghost table
Expand Down
Loading
Loading