Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded replication WIP #1454

Draft
wants to merge 57 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
37c1abd
WIP.
arthurschreiber Sep 30, 2024
5d69fd9
Fixes.
arthurschreiber Sep 30, 2024
df29aa8
Fixups.
arthurschreiber Sep 30, 2024
81a6e6e
Special handling for first job.
arthurschreiber Sep 30, 2024
b44c6df
Fix deadlocks.
arthurschreiber Sep 30, 2024
55ce5a2
Update docs and simulate changes.
arthurschreiber Sep 30, 2024
0d8f39b
start StreamTransaction implementation
meiji163 Oct 3, 2024
22a4b8c
add job Coordinator
meiji163 Oct 3, 2024
3fd6e71
fix job complete logic
meiji163 Oct 3, 2024
f209592
add docker compose
meiji163 Oct 4, 2024
c6c877e
fix application of applyEvent.writeFunc
meiji163 Oct 4, 2024
b031166
Send off transaction events as soon as we see a GTID event.
arthurschreiber Oct 7, 2024
f81a790
WIP.
arthurschreiber Oct 8, 2024
e3b2cda
WIP.
arthurschreiber Oct 9, 2024
3ba9058
fix MarkTransactionCompleted
meiji163 Oct 10, 2024
74f6c9c
go mod tidy
meiji163 Oct 10, 2024
eddc1c9
configure max idle connections
meiji163 Oct 10, 2024
6321e73
vendor packages
meiji163 Oct 10, 2024
5600b91
track binlog coords
meiji163 Oct 10, 2024
3f47ebd
binlog streamer reconnect
meiji163 Oct 16, 2024
e81aabf
fix TestMigrate
meiji163 Oct 16, 2024
e4da5f8
worker-stats command
meiji163 Oct 17, 2024
c5e239c
setup mysql in CI
meiji163 Oct 17, 2024
2e78f6f
set transaction_write_set_extraction in test
meiji163 Oct 17, 2024
875d00d
add ci mysql opt flags
meiji163 Oct 17, 2024
9a022e2
Revert "set transaction_write_set_extraction in test"
meiji163 Oct 17, 2024
18dcee1
change ci mysql opts
meiji163 Oct 17, 2024
299df37
try custom docker run
meiji163 Oct 17, 2024
941689f
Make the linter happy.
arthurschreiber Oct 21, 2024
9e3bc1c
Use testcontainers.
arthurschreiber Oct 21, 2024
113e674
WIP.
arthurschreiber Oct 21, 2024
8e38b86
Merge branch 'master' of https://github.com/github/gh-ost into meiji1…
arthurschreiber Oct 21, 2024
d7ccab9
Merge branch 'master' of https://github.com/github/gh-ost into meiji1…
arthurschreiber Oct 21, 2024
06c7082
fix 2 tests, TestMigrate still not working
meiji163 Oct 21, 2024
d7dc97b
Fix applier tests.
arthurschreiber Oct 21, 2024
bcc7e7a
Fix migrator tests.
arthurschreiber Oct 21, 2024
b747d98
Add error assertions.
arthurschreiber Oct 21, 2024
1a5be0b
🔥
arthurschreiber Oct 21, 2024
1942455
Fix applier connection pool size.
arthurschreiber Oct 22, 2024
85cab4d
Merge branch 'master' into meiji163/parallel-repl
arthurschreiber Oct 23, 2024
2ba0cb2
Fix merge conflict.
arthurschreiber Oct 23, 2024
b82e8f9
Prepare queries.
arthurschreiber Oct 23, 2024
fa7c484
pass throttler to Coordinator
meiji163 Oct 24, 2024
126c981
track time waiting on event channels
meiji163 Oct 24, 2024
641fe92
add flag for number of Coordinator workers
meiji163 Oct 25, 2024
615d1df
Merge branch 'master' of https://github.com/github/gh-ost into meiji1…
arthurschreiber Oct 25, 2024
0a8787e
Fix test case.
arthurschreiber Oct 25, 2024
6aaa374
Merge branch 'master' into meiji163/parallel-repl
meiji163 Nov 18, 2024
f6ec835
remove unused streamer
meiji163 Nov 18, 2024
0555d72
fix coordinator test
meiji163 Nov 18, 2024
569d035
Merge branch 'master' into meiji163/parallel-repl
meiji163 Nov 18, 2024
9b5e2bc
fix Coordinator test
meiji163 Nov 19, 2024
6689aeb
fix migrator test
meiji163 Nov 19, 2024
53e953d
linter fix
meiji163 Nov 19, 2024
e1ca9cd
Merge branch 'master' into meiji163/parallel-repl
meiji163 Nov 22, 2024
b34f2e2
Merge branch 'master' into meiji163/parallel-repl
meiji163 Dec 19, 2024
adfcdf7
Merge branch 'master' into meiji163/parallel-repl
meiji163 Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ type MigrationContext struct {
CutOverType CutOver
ReplicaServerId uint

// Number of workers used by the trx coordinator
NumWorkers int

Hostname string
AssumeMasterHostname string
ApplierTimeZone string
Expand Down
87 changes: 9 additions & 78 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
package binlog

import (
"fmt"
"sync"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"

"time"

Expand Down Expand Up @@ -76,68 +74,17 @@ func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinate
return &returnCoordinates
}

// StreamEvents
func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
if this.currentCoordinates.IsLogPosOverflowBeyond4Bytes(&this.LastAppliedRowsEventHint) {
return fmt.Errorf("Unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes", this.currentCoordinates)
}

if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) {
this.migrationContext.Log.Debugf("Skipping handled query at %+v", this.currentCoordinates)
return nil
}

dml := ToEventDML(ev.Header.EventType.String())
if dml == NotDML {
return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String())
}
for i, row := range rowsEvent.Rows {
if dml == UpdateDML && i%2 == 1 {
// An update has two rows (WHERE+SET)
// We do both at the same time
continue
}
binlogEntry := NewBinlogEntryAt(this.currentCoordinates)
binlogEntry.DmlEvent = NewBinlogDMLEvent(
string(rowsEvent.Table.Schema),
string(rowsEvent.Table.Table),
dml,
)
switch dml {
case InsertDML:
{
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row)
}
case UpdateDML:
{
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1])
}
case DeleteDML:
{
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
}
}
// The channel will do the throttling. Whoever is reading from the channel
// decides whether action is taken synchronously (meaning we wait before
// next iteration) or asynchronously (we keep pushing more events)
// In reality, reads will be synchronous
entriesChannel <- binlogEntry
}
this.LastAppliedRowsEventHint = this.currentCoordinates
return nil
}

// StreamEvents
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
if canStopStreaming() {
return nil
}
// StreamEvents reads binlog events and sends them to the given channel.
// It is blocking and should be executed in a goroutine.
func (this *GoMySQLReader) StreamEvents(ctx context.Context, canStopStreaming func() bool, eventChannel chan<- *replication.BinlogEvent) error {
for {
if canStopStreaming() {
break
return nil
}
if err := ctx.Err(); err != nil {
return err
}
ev, err := this.binlogStreamer.GetEvent(context.Background())
ev, err := this.binlogStreamer.GetEvent(ctx)
if err != nil {
return err
}
Expand All @@ -147,24 +94,8 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
this.currentCoordinates.EventSize = int64(ev.Header.EventSize)
}()

switch binlogEvent := ev.Event.(type) {
case *replication.RotateEvent:
func() {
this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogFile = string(binlogEvent.NextLogName)
}()
this.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", this.currentCoordinates.LogFile, int64(ev.Header.LogPos), binlogEvent.NextLogName)
case *replication.RowsEvent:
if err := this.handleRowsEvent(ev, binlogEvent, entriesChannel); err != nil {
return err
}
}
eventChannel <- ev
}
this.migrationContext.Log.Debugf("done streaming events")

return nil
}

func (this *GoMySQLReader) Close() error {
Expand Down
1 change: 1 addition & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func main() {
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout) or attempting instant DDL")
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")
flag.IntVar(&migrationContext.NumWorkers, "workers", 8, "Number of concurrent workers for applying DML events. Each worker uses one goroutine.")

maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation")
replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query")
Expand Down
4 changes: 3 additions & 1 deletion go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
}
}

func (this *Applier) InitDBConnections() (err error) {
func (this *Applier) InitDBConnections(maxConns int) (err error) {
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
uriWithMulti := fmt.Sprintf("%s&multiStatements=true", applierUri)
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, uriWithMulti); err != nil {
return err
}
this.db.SetMaxOpenConns(maxConns)
this.db.SetMaxIdleConns(maxConns)
singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri)
if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (suite *ApplierTestSuite) TestInitDBConnections() {
applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

suite.Require().Equal("8.0.40", migrationContext.ApplierMySQLVersion)
Expand Down Expand Up @@ -313,7 +313,7 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

dmlEvents := []*binlog.BinlogDMLEvent{
Expand Down Expand Up @@ -373,7 +373,7 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() {
applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

err = applier.ValidateOrDropExistingTables()
Expand Down Expand Up @@ -408,7 +408,7 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTablesWithGhostTableExi
applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

err = applier.ValidateOrDropExistingTables()
Expand Down Expand Up @@ -442,7 +442,7 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTablesWithGhostTableExi
applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

err = applier.ValidateOrDropExistingTables()
Expand Down Expand Up @@ -483,7 +483,7 @@ func (suite *ApplierTestSuite) TestCreateGhostTable() {
applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

err = applier.CreateGhostTable()
Expand Down
Loading
Loading