Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: add indexes for mysql.analyze_jobs #58134

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/systable_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(239), session.CurrentBootstrapVersion)
require.Equal(t, int64(240), session.CurrentBootstrapVersion)
}
25 changes: 24 additions & 1 deletion pkg/session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1203,11 +1203,15 @@ const (
// version 239
// add modify_params to tidb_global_task and tidb_global_task_history.
version239 = 239

// version 240
// Add indexes to mysql.analyze_jobs to speed up the query.
version240 = 240
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version239
var currentBootstrapVersion int64 = version240

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -1382,6 +1386,7 @@ var (
upgradeToVer217,
upgradeToVer218,
upgradeToVer239,
upgradeToVer240,
}
)

Expand Down Expand Up @@ -3287,6 +3292,24 @@ func upgradeToVer239(s sessiontypes.Session, ver int64) {
doReentrantDDL(s, "ALTER TABLE mysql.tidb_global_task_history ADD COLUMN modify_params json AFTER `error`;", infoschema.ErrColumnExists)
}

const (
// AddAnalyzeJobsSchemaTableStateIndex is a DDL statement that adds an index on (table_schema, table_name, state)
// columns to mysql.analyze_jobs table. This index is currently unused since queries filter on partition_name='',
// even for non-partitioned tables. It is kept for potential future optimization where queries could use this
// simpler index directly for non-partitioned tables.
AddAnalyzeJobsSchemaTableStateIndex = "ALTER TABLE mysql.analyze_jobs ADD INDEX idx_schema_table_state (table_schema, table_name, state)"
// AddAnalyzeJobsSchemaTablePartitionStateIndex adds an index on (table_schema, table_name, partition_name, state) to mysql.analyze_jobs
AddAnalyzeJobsSchemaTablePartitionStateIndex = "ALTER TABLE mysql.analyze_jobs ADD INDEX idx_schema_table_partition_state (table_schema, table_name, partition_name, state)"
)

func upgradeToVer240(s sessiontypes.Session, ver int64) {
if ver >= version240 {
return
}
doReentrantDDL(s, AddAnalyzeJobsSchemaTableStateIndex, dbterror.ErrDupKeyName)
doReentrantDDL(s, AddAnalyzeJobsSchemaTablePartitionStateIndex, dbterror.ErrDupKeyName)
}

// initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist.
func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap)
Expand Down
45 changes: 45 additions & 0 deletions pkg/session/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2614,3 +2614,48 @@ func (mebd *mockEtcdBackend) EtcdAddrs() ([]string, error) {
func (mebd *mockEtcdBackend) TLSConfig() *tls.Config { return nil }

func (mebd *mockEtcdBackend) StartGCWorker() error { return nil }

func TestTiDBUpgradeToVer240(t *testing.T) {
ctx := context.Background()
store, dom := CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()

ver239 := version239
seV239 := CreateSessionAndSetID(t, store)
txn, err := store.Begin()
require.NoError(t, err)
m := meta.NewMutator(txn)
err = m.FinishBootstrap(int64(ver239))
require.NoError(t, err)
revertVersionAndVariables(t, seV239, ver239)
err = txn.Commit(ctx)
require.NoError(t, err)
store.SetOption(StoreBootstrappedKey, nil)

// Check index not exist.
res := MustExecToRecodeSet(t, seV239, "show create table mysql.analyze_jobs")
chk := res.NewChunk(nil)
err = res.Next(ctx, chk)
require.NoError(t, err)
require.Equal(t, 1, chk.NumRows())
require.NotContains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_state")
require.NotContains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_partition_state")

dom.Close()
domCurVer, err := BootstrapSession(store)
require.NoError(t, err)
defer domCurVer.Close()
seCurVer := CreateSessionAndSetID(t, store)
ver, err := getBootstrapVersion(seCurVer)
require.NoError(t, err)
require.Equal(t, currentBootstrapVersion, ver)

// Check index exist.
res = MustExecToRecodeSet(t, seCurVer, "show create table mysql.analyze_jobs")
chk = res.NewChunk(nil)
err = res.Next(ctx, chk)
require.NoError(t, err)
require.Equal(t, 1, chk.NumRows())
require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_state")
require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_partition_state")
}
5 changes: 2 additions & 3 deletions pkg/session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func TestUpgradeVersionForResumeJob(t *testing.T) {

wg.Wait()
// Make sure the second add index operation is successful.
sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id >=%d order by job_id", jobID)
sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id >=%d and table_name in ('upgrade_tbl', 'upgrade_tbl1') order by job_id", jobID)
rows, err := execute(context.Background(), seLatestV, sql)
require.NoError(t, err)
require.GreaterOrEqual(t, len(rows), 2)
Expand All @@ -624,8 +624,7 @@ func TestUpgradeVersionForResumeJob(t *testing.T) {
idxFinishTS = runJob.BinlogInfo.FinishedTS
} else {
// The second add index op.
// notice: upgrade `tidb_runaway_queries` table will happened in `upgradeToVer212` function which is before the second add index op.
if strings.Contains(runJob.TableName, "upgrade_tbl") || strings.Contains(runJob.TableName, "tidb_runaway_queries") {
if strings.Contains(runJob.TableName, "upgrade_tbl") {
require.Greater(t, runJob.BinlogInfo.FinishedTS, idxFinishTS)
} else {
// The upgrade DDL ops. These jobs' finishedTS must less than add index ops.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package priorityqueue_test

import (
"context"
"strings"
"testing"

"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -175,3 +177,52 @@ func TestValidateAndPrepareForDynamicPartitionedTable(t *testing.T) {
require.False(t, valid)
require.Equal(t, "last failed analysis duration is less than 2 times the average analysis duration", failReason)
}

func TestPerformanceOfValidateAndPrepare(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(session.CreateAnalyzeJobs)
tk.MustExec(session.AddAnalyzeJobsSchemaTableStateIndex)
tk.MustExec(session.AddAnalyzeJobsSchemaTablePartitionStateIndex)
tk.MustExec("create database example_schema")
tk.MustExec("use example_schema")
tk.MustExec("create table example_table (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (2), partition p1 values less than (4))")
tableInfo, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("example_schema"), model.NewCIStr("example_table"))
require.NoError(t, err)
job := &priorityqueue.DynamicPartitionedTableAnalysisJob{
SchemaName: "example_schema",
GlobalTableID: tableInfo.Meta().ID,
PartitionIDs: map[int64]struct{}{
113: {},
114: {},
},
Weight: 2,
}
initJobs(tk)
insertMultipleFinishedJobs(tk, "example_table", "p0")
se := tk.Session()
sctx := se.(sessionctx.Context)
valid, failReason := job.ValidateAndPrepare(sctx)
require.True(t, valid)
require.Equal(t, "", failReason)

// Insert some failed jobs.
// Just failed.
now := tk.MustQuery("select now()").Rows()[0][0].(string)
insertFailedJobWithStartTime(tk, job.SchemaName, job.GlobalTableName, "p0", now)

// Execute LastFailedDurationQueryForPartition directly to check the query plan.
tableSchema := job.SchemaName
tableName := job.GlobalTableName
partitionNames := []string{"p0", "p1"}

rows, _, err := util.ExecRows(sctx, "explain format='brief' "+priorityqueue.LastFailedDurationQueryForPartition, tableSchema, tableName, partitionNames)
require.NoError(t, err)
planRows := make([]string, 0, len(rows))
for _, row := range rows {
planRows = append(planRows, row.GetString(0))
}
plan := strings.Join(planRows, "\n")
require.Contains(t, plan, "IndexJoin")
require.Contains(t, plan, "IndexRangeScan")
}
6 changes: 4 additions & 2 deletions pkg/statistics/handle/autoanalyze/priorityqueue/interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ const lastFailedDurationQueryForTable = `
LIMIT 1;
`

// LastFailedDurationQueryForPartition is used to get the duration of the last failed analysis for each specified partition.
// Exported for testing.
// For multiple partitions, we only need to return the duration of the most recent failed analysis.
// We pick the minimum duration of all failed analyses because we want to be conservative.
const lastFailedDurationQueryForPartition = `
const LastFailedDurationQueryForPartition = `
SELECT
MIN(TIMESTAMPDIFF(SECOND, aj.start_time, CURRENT_TIMESTAMP)) AS min_duration
FROM (
Expand Down Expand Up @@ -128,7 +130,7 @@ func GetLastFailedAnalysisDuration(
query = lastFailedDurationQueryForTable
params = append(params, schema, tableName)
} else {
query = lastFailedDurationQueryForPartition
query = LastFailedDurationQueryForPartition
params = append(params, schema, tableName, partitionNames)
}

Expand Down