Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded replication WIP #1454

Draft
wants to merge 57 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
37c1abd
WIP.
arthurschreiber Sep 30, 2024
5d69fd9
Fixes.
arthurschreiber Sep 30, 2024
df29aa8
Fixups.
arthurschreiber Sep 30, 2024
81a6e6e
Special handling for first job.
arthurschreiber Sep 30, 2024
b44c6df
Fix deadlocks.
arthurschreiber Sep 30, 2024
55ce5a2
Update docs and simulate changes.
arthurschreiber Sep 30, 2024
0d8f39b
start StreamTransaction implementation
meiji163 Oct 3, 2024
22a4b8c
add job Coordinator
meiji163 Oct 3, 2024
3fd6e71
fix job complete logic
meiji163 Oct 3, 2024
f209592
add docker compose
meiji163 Oct 4, 2024
c6c877e
fix application of applyEvent.writeFunc
meiji163 Oct 4, 2024
b031166
Send off transaction events as soon as we see a GTID event.
arthurschreiber Oct 7, 2024
f81a790
WIP.
arthurschreiber Oct 8, 2024
e3b2cda
WIP.
arthurschreiber Oct 9, 2024
3ba9058
fix MarkTransactionCompleted
meiji163 Oct 10, 2024
74f6c9c
go mod tidy
meiji163 Oct 10, 2024
eddc1c9
configure max idle connections
meiji163 Oct 10, 2024
6321e73
vendor packages
meiji163 Oct 10, 2024
5600b91
track binlog coords
meiji163 Oct 10, 2024
3f47ebd
binlog streamer reconnect
meiji163 Oct 16, 2024
e81aabf
fix TestMigrate
meiji163 Oct 16, 2024
e4da5f8
worker-stats command
meiji163 Oct 17, 2024
c5e239c
setup mysql in CI
meiji163 Oct 17, 2024
2e78f6f
set transaction_write_set_extraction in test
meiji163 Oct 17, 2024
875d00d
add ci mysql opt flags
meiji163 Oct 17, 2024
9a022e2
Revert "set transaction_write_set_extraction in test"
meiji163 Oct 17, 2024
18dcee1
change ci mysql opts
meiji163 Oct 17, 2024
299df37
try custom docker run
meiji163 Oct 17, 2024
941689f
Make the linter happy.
arthurschreiber Oct 21, 2024
9e3bc1c
Use testcontainers.
arthurschreiber Oct 21, 2024
113e674
WIP.
arthurschreiber Oct 21, 2024
8e38b86
Merge branch 'master' of https://github.com/github/gh-ost into meiji1…
arthurschreiber Oct 21, 2024
d7ccab9
Merge branch 'master' of https://github.com/github/gh-ost into meiji1…
arthurschreiber Oct 21, 2024
06c7082
fix 2 tests, TestMigrate still not working
meiji163 Oct 21, 2024
d7dc97b
Fix applier tests.
arthurschreiber Oct 21, 2024
bcc7e7a
Fix migrator tests.
arthurschreiber Oct 21, 2024
b747d98
Add error assertions.
arthurschreiber Oct 21, 2024
1a5be0b
🔥
arthurschreiber Oct 21, 2024
1942455
Fix applier connection pool size.
arthurschreiber Oct 22, 2024
85cab4d
Merge branch 'master' into meiji163/parallel-repl
arthurschreiber Oct 23, 2024
2ba0cb2
Fix merge conflict.
arthurschreiber Oct 23, 2024
b82e8f9
Prepare queries.
arthurschreiber Oct 23, 2024
fa7c484
pass throttler to Coordinator
meiji163 Oct 24, 2024
126c981
track time waiting on event channels
meiji163 Oct 24, 2024
641fe92
add flag for number of Coordinator workers
meiji163 Oct 25, 2024
615d1df
Merge branch 'master' of https://github.com/github/gh-ost into meiji1…
arthurschreiber Oct 25, 2024
0a8787e
Fix test case.
arthurschreiber Oct 25, 2024
6aaa374
Merge branch 'master' into meiji163/parallel-repl
meiji163 Nov 18, 2024
f6ec835
remove unused streamer
meiji163 Nov 18, 2024
0555d72
fix coordinator test
meiji163 Nov 18, 2024
569d035
Merge branch 'master' into meiji163/parallel-repl
meiji163 Nov 18, 2024
9b5e2bc
fix Coordinator test
meiji163 Nov 19, 2024
6689aeb
fix migrator test
meiji163 Nov 19, 2024
53e953d
linter fix
meiji163 Nov 19, 2024
e1ca9cd
Merge branch 'master' into meiji163/parallel-repl
meiji163 Nov 22, 2024
b34f2e2
Merge branch 'master' into meiji163/parallel-repl
meiji163 Dec 19, 2024
adfcdf7
Merge branch 'master' into meiji163/parallel-repl
meiji163 Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ type MigrationContext struct {
CutOverType CutOver
ReplicaServerId uint

// Number of workers used by the Coordinator
NumWorkers int

Hostname string
AssumeMasterHostname string
ApplierTimeZone string
Expand Down
188 changes: 187 additions & 1 deletion go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package binlog

import (
"bytes"
"fmt"
"sync"

Expand Down Expand Up @@ -76,7 +77,7 @@ func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinate
return &returnCoordinates
}

// StreamEvents
// handleRowsEvents processes a RowEvent from the binlog and sends the DML event to the entriesChannel.
func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
if this.currentCoordinates.IsLogPosOverflowBeyond4Bytes(&this.LastAppliedRowsEventHint) {
return fmt.Errorf("Unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes", this.currentCoordinates)
Expand Down Expand Up @@ -105,6 +106,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
)
switch dml {
case InsertDML:

{
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row)
}
Expand All @@ -128,6 +130,190 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
return nil
}

// RowsEventToBinlogEntry processes MySQL RowsEvent into our BinlogEntry for later application.
// copied from handleRowEvents
func RowsEventToBinlogEntry(eventType replication.EventType, rowsEvent *replication.RowsEvent, binlogCoords mysql.BinlogCoordinates) (*BinlogEntry, error) {
dml := ToEventDML(eventType.String())
if dml == NotDML {
return nil, fmt.Errorf("Unknown DML type: %s", eventType.String())
}
binlogEntry := NewBinlogEntryAt(binlogCoords)
for i, row := range rowsEvent.Rows {
if dml == UpdateDML && i%2 == 1 {
// An update has two rows (WHERE+SET)
// We do both at the same time
continue
}
binlogEntry.DmlEvent = NewBinlogDMLEvent(
string(rowsEvent.Table.Schema),
string(rowsEvent.Table.Table),
dml,
)
switch dml {
case InsertDML:
{
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row)
}
case UpdateDML:
{
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1])
}
case DeleteDML:
{
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
}
}
}
return binlogEntry, nil
}

type Transaction struct {
SequenceNumber int64
LastCommitted int64
Changes chan *BinlogEntry
}

func (this *GoMySQLReader) StreamTransactions(ctx context.Context, transactionsChannel chan<- *Transaction) error {
if err := ctx.Err(); err != nil {
return err
}

previousSequenceNumber := int64(0)

groups:
for {
if err := ctx.Err(); err != nil {
return err
}

ev, err := this.binlogStreamer.GetEvent(ctx)
if err != nil {
return err
}
func() {
this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
this.currentCoordinates.EventSize = int64(ev.Header.EventSize)
}()

fmt.Printf("Event: %s\n", ev.Header.EventType)

// Read each event and do something with it
//
// First, ignore all events until we find the next GTID event so that we can start
// at a transaction boundary.
//
// Once we find a GTID event, we can start an event group,
// and then process all events in that group.
// An event group is defined as all events that are part of the same transaction,
// which is defined as all events between the GTID event, a `QueryEvent` containing a `BEGIN` query and ends with
// either a XIDEvent or a `QueryEvent` containing a `COMMIT` or `ROLLBACK` query.
//
// Each group is a struct containing the SequenceNumber, LastCommitted, and a channel of events.
//
// Once the group has ended, we can start looking for the next GTID event.

var group *Transaction
switch binlogEvent := ev.Event.(type) {
case *replication.GTIDEvent:
this.migrationContext.Log.Infof("GTIDEvent: %+v", binlogEvent)

// Bail out if we find a gap in the sequence numbers
if previousSequenceNumber != 0 && binlogEvent.SequenceNumber != previousSequenceNumber+1 {
return fmt.Errorf("unexpected sequence number: %d, expected %d", binlogEvent.SequenceNumber, previousSequenceNumber+1)
}

group = &Transaction{
SequenceNumber: binlogEvent.SequenceNumber,
LastCommitted: binlogEvent.LastCommitted,
Changes: make(chan *BinlogEntry, 1000),
}

previousSequenceNumber = binlogEvent.SequenceNumber

// We are good to send the transaction, the transaction events arrive async
this.migrationContext.Log.Infof("sending transaction: %d %d", group.SequenceNumber, group.LastCommitted)
transactionsChannel <- group
default:
this.migrationContext.Log.Infof("Ignoring Event: %+v", ev.Event)
continue
}

// Next event should be a query event

ev, err = this.binlogStreamer.GetEvent(ctx)
if err != nil {
close(group.Changes)
return err
}
this.migrationContext.Log.Infof("1 - Event: %s", ev.Header.EventType)

switch binlogEvent := ev.Event.(type) {
case *replication.QueryEvent:
if bytes.Equal([]byte("BEGIN"), binlogEvent.Query) {
this.migrationContext.Log.Infof("BEGIN for transaction in schema %s", binlogEvent.Schema)
} else {
this.migrationContext.Log.Infof("QueryEvent: %+v", binlogEvent)
this.migrationContext.Log.Infof("Query: %s", binlogEvent.Query)

close(group.Changes)

// wait for the next event group
continue groups
}
default:
this.migrationContext.Log.Infof("unexpected Event: %+v", ev.Event)
close(group.Changes)

// TODO: handle the group - we want to make sure we process the group's LastCommitted and SequenceNumber

// wait for the next event group
continue groups
}

// Next event should be a table map event

events:
// Now we can start processing the group
for {
ev, err = this.binlogStreamer.GetEvent(ctx)
if err != nil {
close(group.Changes)
return err
}
this.migrationContext.Log.Infof("3 - Event: %s", ev.Header.EventType)

switch binlogEvent := ev.Event.(type) {
case *replication.TableMapEvent:
this.migrationContext.Log.Infof("TableMapEvent for %s.%s: %+v", binlogEvent.Schema, binlogEvent.Table, binlogEvent)
case *replication.RowsEvent:
binlogEntry, err := RowsEventToBinlogEntry(ev.Header.EventType, binlogEvent, this.currentCoordinates)
if err != nil {
close(group.Changes)
return err
}
this.migrationContext.Log.Infof("RowsEvent: %v", binlogEvent)
group.Changes <- binlogEntry
this.migrationContext.Log.Infof("Length of group.Changes: %d", len(group.Changes))
case *replication.XIDEvent:
this.migrationContext.Log.Infof("XIDEvent: %+v", binlogEvent)
this.migrationContext.Log.Infof("COMMIT for transaction")
break events
default:
close(group.Changes)
this.migrationContext.Log.Infof("unexpected Event: %+v", ev.Event)
return fmt.Errorf("unexpected Event: %+v", ev.Event)
}
}

close(group.Changes)

this.migrationContext.Log.Infof("done processing group - %d events", len(group.Changes))
}
}

// StreamEvents
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
if canStopStreaming() {
Expand Down
5 changes: 4 additions & 1 deletion go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
}
}

func (this *Applier) InitDBConnections() (err error) {
func (this *Applier) InitDBConnections(maxConns int) (err error) {
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
return err
}
this.db.SetMaxOpenConns(maxConns)
this.db.SetMaxIdleConns(maxConns)
singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri)
if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil {
return err
Expand Down Expand Up @@ -1231,6 +1233,7 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
return rollback(buildResult.err)
}
result, err := tx.Exec(buildResult.query, buildResult.args...)

if err != nil {
err = fmt.Errorf("%w; query=%s; args=%+v", err, buildResult.query, buildResult.args)
return rollback(err)
Expand Down
10 changes: 5 additions & 5 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (suite *ApplierTestSuite) TestInitDBConnections() {
applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

suite.Require().Equal("8.0.40", migrationContext.ApplierMySQLVersion)
Expand Down Expand Up @@ -318,7 +318,7 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

rc, _, err := suite.mysqlContainer.Exec(ctx, []string{"mysql", "-uroot", "-proot-password", "-e", "CREATE TABLE test._testing_gho (id INT, item_id INT);"})
Expand Down Expand Up @@ -380,7 +380,7 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() {
applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

err = applier.ValidateOrDropExistingTables()
Expand Down Expand Up @@ -426,7 +426,7 @@ func (suite *ApplierTestSuite) TestApplyIterationInsertQuery() {
applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

chunkSize, rowsAffected, duration, err := applier.ApplyIterationInsertQuery()
Expand Down Expand Up @@ -492,7 +492,7 @@ func (suite *ApplierTestSuite) TestApplyIterationInsertQueryFailsFastWhenSelecti
applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
err = applier.InitDBConnections(8)
suite.Require().NoError(err)

// Lock one of the rows
Expand Down
Loading