Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for dynamic chunk size #1224

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions doc/command-line-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ It is not reliable to parse the `ALTER` statement to determine if it is instant

`gh-ost` will automatically fallback to the normal DDL process if the attempt to use instant DDL is unsuccessful.

### chunk-size

Chunk size is the number of rows to copy in a single batch for copying data from the original table to the ghost table. The default value is 1000. Increasing the chunk-size can improve performance (via more batching) but also increases the risk of replica delay.

See also: [`dynamic-chunking`](#dynamic-chunking)

### conf

`--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format:
Expand Down Expand Up @@ -122,6 +128,27 @@ Why is this behavior configurable? Different workloads have different characteri

Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light enough for `gh-ost` to apply a fraction of the batch size.

### dynamic-chunking

Dynamic chunking (default: `OFF`) is a feature that allows `gh-ost` to automatically increase or decrease the `--chunk-size` up to 50x, based on the execution time of previous copy-row operations. The goal is to find the optimal batch size to reach `--dynamic-chunk-size-target-millis` (default: 50).

For example, assume `--chunk-size=1000`, `--dynamic-chunking=true` and `--dynamic-chunk-size-target-millis=50`:

- The actual "target" chunk size used will always be in the range of `[20,50000]` (within 50x the chunk size)
- Approximately every 1 second, `gh-ost` will re-assess if the target chunk size is optimal based on the `p90` of recent executions.
- Increases in target chunk size are scaled up by no more than 50% of the current target size at a time.
- If any copy-row operations exceed 250ms (5x the target), the target chunk size is immediately reduced to 10% of its current value.

Enabling dynamic chunk size can be more reliable than the static `--chunk-size=N`, because tables are not created equally. For a table with a very high number of columns and several indexes, `1000` rows may actually be too large of a chunk size. Similarly, for a table with very few columns and no indexes, the ideal batch size may be 20K+ rows (while still being under the 50ms target).

See also: [`chunk-size`](#chunk-size), [`dynamic-chunk-size-target-millis`](#dynamic-chunk-size-target-millis)

### dynamic-chunk-size-target-millis

The target execution time for each copy-row operation when [`--dynamic-chunking`](#dynamic-chunking) (default: `OFF`) is enabled.

The default value of `50` is a good starting point for most workloads. If you find that read-replicas are intermittently falling behind, you may want to decrease this value. Similarly, if you do not use read-replicas there may be a benefit from increasing this value slightly. The recommended range is `[10,10000]`. Values larger than this have limited added benefit and are not recommended.

### exact-rowcount

A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can and often will be, a large number. Exactly what that number is?
Expand Down
117 changes: 114 additions & 3 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,22 @@ const (
HTTPStatusOK = 200
MaxEventsBatchSize = 1000
ETAUnknown = math.MinInt64

// MaxDynamicScaleFactor is the maximum factor dynamic scaling can change the chunkSize from
// the setting chunkSize. For example, if the factor is 10, and chunkSize is 1000, then the
// values will be in the range of 100 to 10000.
MaxDynamicScaleFactor = 50
// MaxDynamicStepFactor is the maximum amount each recalculation of the dynamic chunkSize can
// increase by. For example, if the newTarget is 5000 but the current target is 1000, the newTarget
// will be capped back down to 1500. Over time the number 5000 will be reached, but not straight away.
MaxDynamicStepFactor = 1.5
// MinDynamicChunkSize is the minimum chunkSize that can be used when dynamic chunkSize is enabled.
// This helps prevent a scenario where the chunk size is too small (it can never be less than 1).
MinDynamicRowSize = 10
// DynamicPanicFactor is the factor by which the feedback process takes immediate action when
// the chunkSize appears to be too large. For example, if the PanicFactor is 5, and the target *time*
// is 50ms, an actual time 250ms+ will cause the dynamic chunk size to immediately be reduced.
DynamicPanicFactor = 5
)

var (
Expand Down Expand Up @@ -118,7 +134,7 @@ type MigrationContext struct {

HeartbeatIntervalMilliseconds int64
defaultNumRetries int64
ChunkSize int64
chunkSize int64
niceRatio float64
MaxLagMillisecondsThrottleThreshold int64
throttleControlReplicaKeys *mysql.InstanceKeyMap
Expand Down Expand Up @@ -146,6 +162,12 @@ type MigrationContext struct {
HooksHintToken string
HooksStatusIntervalSec int64

DynamicChunking bool
DynamicChunkSizeTargetMillis int64
targetChunkSizeMutex sync.Mutex
targetChunkFeedback []time.Duration
targetChunkSize int64

DropServeSocket bool
ServeSocketFile string
ServeTCPPort int64
Expand Down Expand Up @@ -269,7 +291,7 @@ func NewMigrationContext() *MigrationContext {
return &MigrationContext{
Uuid: uuid.NewV4().String(),
defaultNumRetries: 60,
ChunkSize: 1000,
chunkSize: 1000,
InspectorConnectionConfig: mysql.NewConnectionConfig(),
ApplierConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1500,
Expand All @@ -287,6 +309,7 @@ func NewMigrationContext() *MigrationContext {
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
Log: NewDefaultLogger(),
DynamicChunking: false,
}
}

Expand Down Expand Up @@ -554,6 +577,94 @@ func (this *MigrationContext) GetTotalRowsCopied() int64 {
return atomic.LoadInt64(&this.TotalRowsCopied)
}

// ChunkDurationFeedback collects samples from copy-rows tasks, and feeds them
// back into a moving p90 that is used to return a more precise value
// in GetChunkSize() calls. Usually we wait for multiple samples and then recalculate
// in GetChunkSize(), however if the input value far exceeds what was expected (>5x)
// we synchronously reduce the chunk size. If it was a one off, it's not an issue
// because the next few samples will always scale the value back up.
func (this *MigrationContext) ChunkDurationFeedback(d time.Duration) (outOfRange bool) {
if !this.DynamicChunking {
return false
}
this.targetChunkSizeMutex.Lock()
defer this.targetChunkSizeMutex.Unlock()
if int64(d) > (this.DynamicChunkSizeTargetMillis * DynamicPanicFactor * int64(time.Millisecond)) {
this.targetChunkFeedback = []time.Duration{}
newTarget := float64(this.targetChunkSize) / float64(DynamicPanicFactor*2)
this.targetChunkSize = this.boundaryCheckTargetChunkSize(newTarget)
return true // don't include in feedback
}
this.targetChunkFeedback = append(this.targetChunkFeedback, d)
return false
}

// calculateNewTargetChunkSize is called by GetChunkSize()
// under a mutex. It's safe to read this.targetchunkFeedback.
func (this *MigrationContext) calculateNewTargetChunkSize() int64 {
// We do all our math as float64 of time in ns
p90 := float64(lazyFindP90(this.targetChunkFeedback))
targetTime := float64(this.DynamicChunkSizeTargetMillis * int64(time.Millisecond))
newTargetRows := float64(this.targetChunkSize) * (targetTime / p90)
return this.boundaryCheckTargetChunkSize(newTargetRows)
}

// boundaryCheckTargetChunkSize makes sure the new target is not
// too large/small since we are only allowed to scale up/down 50x from
// the original ("reference") chunk size, and only permitted to increase
// by 50% at a time. This is called under a mutex.
func (this *MigrationContext) boundaryCheckTargetChunkSize(newTargetRows float64) int64 {
referenceSize := float64(atomic.LoadInt64(&this.chunkSize))
if newTargetRows < (referenceSize / MaxDynamicScaleFactor) {
newTargetRows = referenceSize / MaxDynamicScaleFactor
}
if newTargetRows > (referenceSize * MaxDynamicScaleFactor) {
newTargetRows = referenceSize * MaxDynamicScaleFactor
}
if newTargetRows > float64(this.targetChunkSize)*MaxDynamicStepFactor {
newTargetRows = float64(this.targetChunkSize) * MaxDynamicStepFactor
}
if newTargetRows < MinDynamicRowSize {
newTargetRows = MinDynamicRowSize
}
return int64(newTargetRows)
}

// GetChunkSize returns the number of rows to copy in a single chunk:
// - If Dynamic Chunking is disabled, it will return this.chunkSize.
// - If Dynamic Chunking is enabled, it will return a value that
// automatically adjusts based on the duration of the last few
// copy-rows tasks.
//
// Historically gh-ost has used a static chunk size (i.e. 1000 rows)
// which can be adjusted while gh-ost is running.
// An ideal chunk size is large enough that it can batch operations,
// but small enough that it doesn't cause spikes in replica lag.
//
// The problem with basing the configurable on row-size is two fold:
// - Fow very narrow rows, it's not enough (leaving performance on the table).
// - For very wide rows (or with many secondary indexes) 1000 might be too high!
//
// Dynamic chunking addresses this by using row-size as a starting point,
// *but* the main configurable is based on time (in ms).
func (this *MigrationContext) GetChunkSize() int64 {
if !this.DynamicChunking {
return atomic.LoadInt64(&this.chunkSize)
}
this.targetChunkSizeMutex.Lock()
defer this.targetChunkSizeMutex.Unlock()
if this.targetChunkSize == 0 {
this.targetChunkSize = atomic.LoadInt64(&this.chunkSize)
}
// We need 10 samples to make a decision because we
// calculate it from the p90 (i.e. 2nd to highest value).
if len(this.targetChunkFeedback) >= 10 {
this.targetChunkSize = this.calculateNewTargetChunkSize()
this.targetChunkFeedback = []time.Duration{} // reset
}
return this.targetChunkSize
}

func (this *MigrationContext) GetIteration() int64 {
return atomic.LoadInt64(&this.Iteration)
}
Expand Down Expand Up @@ -611,7 +722,7 @@ func (this *MigrationContext) SetChunkSize(chunkSize int64) {
if chunkSize > 100000 {
chunkSize = 100000
}
atomic.StoreInt64(&this.ChunkSize, chunkSize)
atomic.StoreInt64(&this.chunkSize, chunkSize)
}

func (this *MigrationContext) SetDMLBatchSize(batchSize int64) {
Expand Down
75 changes: 75 additions & 0 deletions go/base/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,78 @@ func TestReadConfigFile(t *testing.T) {
}
}
}

func TestDynamicChunker(t *testing.T) {
context := NewMigrationContext()
context.chunkSize = 1000
context.DynamicChunking = true
context.DynamicChunkSizeTargetMillis = 50

// Before feedback it should match the static chunk size
test.S(t).ExpectEquals(context.GetChunkSize(), int64(1000))

// 1s is >5x the target, so it should immediately /10 the target
context.ChunkDurationFeedback(1 * time.Second)
test.S(t).ExpectEquals(context.GetChunkSize(), int64(100))

// Let's provide 10 pieces of feedback, and see the chunk size
// be adjusted based on the p90th value.
context.ChunkDurationFeedback(33 * time.Millisecond) // 1st
context.ChunkDurationFeedback(33 * time.Millisecond) // 2nd
context.ChunkDurationFeedback(32 * time.Millisecond) // 3rd
context.ChunkDurationFeedback(40 * time.Millisecond)
context.ChunkDurationFeedback(61 * time.Millisecond)
context.ChunkDurationFeedback(37 * time.Millisecond)
context.ChunkDurationFeedback(38 * time.Millisecond)
context.ChunkDurationFeedback(35 * time.Millisecond)
context.ChunkDurationFeedback(29 * time.Millisecond)
test.S(t).ExpectEquals(context.GetChunkSize(), int64(100)) // 9th
context.ChunkDurationFeedback(38 * time.Millisecond) // 10th
// Because 10 items of feedback have been received,
// the chunk size is recalculated. The p90 is 40ms (below our target)
// so the adjusted chunk size increases 25% to 125
test.S(t).ExpectEquals(context.GetChunkSize(), int64(125))

// Collect some new feedback where the p90 is 500us (much lower than our target)
// We have boundary checking on the value which limits it to 50% greater
// than the previous chunk size.

context.ChunkDurationFeedback(400 * time.Microsecond)
context.ChunkDurationFeedback(450 * time.Microsecond)
context.ChunkDurationFeedback(470 * time.Microsecond)
context.ChunkDurationFeedback(520 * time.Microsecond)
context.ChunkDurationFeedback(500 * time.Microsecond)
context.ChunkDurationFeedback(490 * time.Microsecond)
context.ChunkDurationFeedback(300 * time.Microsecond)
context.ChunkDurationFeedback(450 * time.Microsecond)
context.ChunkDurationFeedback(460 * time.Microsecond)
context.ChunkDurationFeedback(480 * time.Microsecond)
test.S(t).ExpectEquals(context.GetChunkSize(), int64(187)) // very minor increase

// Test that the chunk size is not allowed to grow larger than 50x
// the original chunk size. Because of the gradual step up, we need to
// provide a lot of feedback first.
for i := 0; i < 1000; i++ {
context.ChunkDurationFeedback(480 * time.Microsecond)
context.GetChunkSize()
}
test.S(t).ExpectEquals(context.GetChunkSize(), int64(50000))

// Similarly, the minimum chunksize is 1000/50=20 rows no matter what the feedback.
// The downscaling rule is /10 for values that immediately exceed 5x the target,
// so it usually scales down before the feedback re-evaluation kicks in.
for i := 0; i < 100; i++ {
context.ChunkDurationFeedback(10 * time.Second)
context.GetChunkSize()
}
test.S(t).ExpectEquals(context.GetChunkSize(), int64(20))

// If we set the chunkSize to 100, then 100/50=2 is the minimum.
// But there is a hard coded minimum of 10 rows for safety.
context.chunkSize = 100
for i := 0; i < 100; i++ {
context.ChunkDurationFeedback(10 * time.Second)
context.GetChunkSize()
}
test.S(t).ExpectEquals(context.GetChunkSize(), int64(10))
}
11 changes: 11 additions & 0 deletions go/base/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"os"
"regexp"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -93,3 +94,13 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
return "", fmt.Errorf("Unexpected database port reported: %+v / extra_port: %+v", port, extraPort)
}
}

// lazyFindP90 finds the second to last value in a slice.
// This is the same as a p90 if there are 10 values, but if
// there were 100 values it would technically be a p99 etc.
func lazyFindP90(a []time.Duration) time.Duration {
sort.Slice(a, func(i, j int) bool {
return a[i] > a[j]
})
return a[len(a)/10]
}
17 changes: 17 additions & 0 deletions go/base/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package base

import (
"testing"
"time"

"github.com/openark/golib/log"
test "github.com/openark/golib/tests"
Expand All @@ -27,3 +28,19 @@ func TestStringContainsAll(t *testing.T) {
test.S(t).ExpectTrue(StringContainsAll(s, "insert", ""))
test.S(t).ExpectTrue(StringContainsAll(s, "insert", "update", "delete"))
}

func TestFindP90(t *testing.T) {
times := []time.Duration{
1 * time.Second,
2 * time.Second,
1 * time.Second,
3 * time.Second,
10 * time.Second,
1 * time.Second,
1 * time.Second,
1 * time.Second,
1 * time.Second,
1 * time.Second,
}
test.S(t).ExpectEquals(lazyFindP90(times), 3*time.Second)
}
2 changes: 2 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ func main() {
flag.BoolVar(&migrationContext.CutOverExponentialBackoff, "cut-over-exponential-backoff", false, "Wait exponentially longer intervals between failed cut-over attempts. Wait intervals obey a maximum configurable with 'exponential-backoff-max-interval').")
exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.")
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
flag.BoolVar(&migrationContext.DynamicChunking, "dynamic-chunking", false, "automatically adjust the chunk size based on a time-target")
flag.Int64Var(&migrationContext.DynamicChunkSizeTargetMillis, "dynamic-chunk-size-target-millis", 50, "target duration of a chunk when dynamic-chunking is enabled")
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)")
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)")
Expand Down
4 changes: 2 additions & 2 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
atomic.LoadInt64(&this.migrationContext.ChunkSize),
this.migrationContext.GetChunkSize(),
this.migrationContext.GetIteration() == 0,
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
)
Expand Down Expand Up @@ -614,7 +614,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
// data actually gets copied from original table.
func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
startTime := time.Now()
chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize)
chunkSize = this.migrationContext.GetChunkSize()

query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
this.migrationContext.DatabaseName,
Expand Down
9 changes: 7 additions & 2 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
maxLoad := this.migrationContext.GetMaxLoad()
criticalLoad := this.migrationContext.GetCriticalLoad()
fmt.Fprintf(w, "# chunk-size: %+v; max-lag-millis: %+vms; dml-batch-size: %+v; max-load: %s; critical-load: %s; nice-ratio: %f\n",
atomic.LoadInt64(&this.migrationContext.ChunkSize),
this.migrationContext.GetChunkSize(),
atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold),
atomic.LoadInt64(&this.migrationContext.DMLBatchSize),
maxLoad.String(),
Expand Down Expand Up @@ -1315,8 +1315,13 @@ func (this *Migrator) executeWriteFuncs() error {
if err := copyRowsFunc(); err != nil {
return this.migrationContext.Log.Errore(err)
}
// Send feedback to the chunker.
copyRowsDuration := time.Since(copyRowsStartTime)
outOfRange := this.migrationContext.ChunkDurationFeedback(copyRowsDuration)
if outOfRange {
this.migrationContext.Log.Warningf("Chunk duration took: %s, throttling copy-rows", copyRowsDuration)
}
if niceRatio := this.migrationContext.GetNiceRatio(); niceRatio > 0 {
copyRowsDuration := time.Since(copyRowsStartTime)
sleepTimeNanosecondFloat64 := niceRatio * float64(copyRowsDuration.Nanoseconds())
sleepTime := time.Duration(int64(sleepTimeNanosecondFloat64)) * time.Nanosecond
time.Sleep(sleepTime)
Expand Down
2 changes: 1 addition & 1 deletion go/logic/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ help # This message
case "chunk-size":
{
if argIsQuestion {
fmt.Fprintf(writer, "%+v\n", atomic.LoadInt64(&this.migrationContext.ChunkSize))
fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetChunkSize())
return NoPrintStatusRule, nil
}
if chunkSize, err := strconv.Atoi(arg); err != nil {
Expand Down