diff --git a/br/pkg/restore/snap_client/systable_restore_test.go b/br/pkg/restore/snap_client/systable_restore_test.go index 9e1812aabaebb..6142a270745db 100644 --- a/br/pkg/restore/snap_client/systable_restore_test.go +++ b/br/pkg/restore/snap_client/systable_restore_test.go @@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) { // // The above variables are in the file br/pkg/restore/systable_restore.go func TestMonitorTheSystemTableIncremental(t *testing.T) { - require.Equal(t, int64(239), session.CurrentBootstrapVersion) + require.Equal(t, int64(240), session.CurrentBootstrapVersion) } diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index 0a24c5cbaa266..75d9ce0dcd6cf 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -458,7 +458,9 @@ const ( instance VARCHAR(512) NOT NULL comment 'address of the TiDB instance executing the analyze job', process_id BIGINT(64) UNSIGNED comment 'ID of the process executing the analyze job', PRIMARY KEY (id), - KEY (update_time) + KEY (update_time), + INDEX idx_schema_table_state (table_schema, table_name, state), + INDEX idx_schema_table_partition_state (table_schema, table_name, partition_name, state) );` // CreateAdvisoryLocks stores the advisory locks (get_lock, release_lock). CreateAdvisoryLocks = `CREATE TABLE IF NOT EXISTS mysql.advisory_locks ( @@ -1203,11 +1205,15 @@ const ( // version 239 // add modify_params to tidb_global_task and tidb_global_task_history. version239 = 239 + + // version 240 + // Add indexes to mysql.analyze_jobs to speed up the query. + version240 = 240 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version239 +var currentBootstrapVersion int64 = version240 // DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it. var internalSQLTimeout = owner.ManagerSessionTTL + 15 @@ -1382,6 +1388,7 @@ var ( upgradeToVer217, upgradeToVer218, upgradeToVer239, + upgradeToVer240, } ) @@ -3287,6 +3294,24 @@ func upgradeToVer239(s sessiontypes.Session, ver int64) { doReentrantDDL(s, "ALTER TABLE mysql.tidb_global_task_history ADD COLUMN modify_params json AFTER `error`;", infoschema.ErrColumnExists) } +const ( + // addAnalyzeJobsSchemaTableStateIndex is a DDL statement that adds an index on (table_schema, table_name, state) + // columns to mysql.analyze_jobs table. This index is currently unused since queries filter on partition_name='', + // even for non-partitioned tables. It is kept for potential future optimization where queries could use this + // simpler index directly for non-partitioned tables. + addAnalyzeJobsSchemaTableStateIndex = "ALTER TABLE mysql.analyze_jobs ADD INDEX idx_schema_table_state (table_schema, table_name, state)" + // addAnalyzeJobsSchemaTablePartitionStateIndex adds an index on (table_schema, table_name, partition_name, state) to mysql.analyze_jobs + addAnalyzeJobsSchemaTablePartitionStateIndex = "ALTER TABLE mysql.analyze_jobs ADD INDEX idx_schema_table_partition_state (table_schema, table_name, partition_name, state)" +) + +func upgradeToVer240(s sessiontypes.Session, ver int64) { + if ver >= version240 { + return + } + doReentrantDDL(s, addAnalyzeJobsSchemaTableStateIndex, dbterror.ErrDupKeyName) + doReentrantDDL(s, addAnalyzeJobsSchemaTablePartitionStateIndex, dbterror.ErrDupKeyName) +} + // initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist. func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap) diff --git a/pkg/session/bootstrap_test.go b/pkg/session/bootstrap_test.go index 9ff0d449be6ab..01ad9a797b1f7 100644 --- a/pkg/session/bootstrap_test.go +++ b/pkg/session/bootstrap_test.go @@ -2614,3 +2614,48 @@ func (mebd *mockEtcdBackend) EtcdAddrs() ([]string, error) { func (mebd *mockEtcdBackend) TLSConfig() *tls.Config { return nil } func (mebd *mockEtcdBackend) StartGCWorker() error { return nil } + +func TestTiDBUpgradeToVer240(t *testing.T) { + ctx := context.Background() + store, dom := CreateStoreAndBootstrap(t) + defer func() { require.NoError(t, store.Close()) }() + + ver239 := version239 + seV239 := CreateSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMutator(txn) + err = m.FinishBootstrap(int64(ver239)) + require.NoError(t, err) + revertVersionAndVariables(t, seV239, ver239) + err = txn.Commit(ctx) + require.NoError(t, err) + store.SetOption(StoreBootstrappedKey, nil) + + // Check if the required indexes already exist in mysql.analyze_jobs (they are created by default in new clusters) + res := MustExecToRecodeSet(t, seV239, "show create table mysql.analyze_jobs") + chk := res.NewChunk(nil) + err = res.Next(ctx, chk) + require.NoError(t, err) + require.Equal(t, 1, chk.NumRows()) + require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_state") + require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_partition_state") + + // Check that the indexes still exist after upgrading to the new version and that no errors occurred during the upgrade. + dom.Close() + domCurVer, err := BootstrapSession(store) + require.NoError(t, err) + defer domCurVer.Close() + seCurVer := CreateSessionAndSetID(t, store) + ver, err := getBootstrapVersion(seCurVer) + require.NoError(t, err) + require.Equal(t, currentBootstrapVersion, ver) + + res = MustExecToRecodeSet(t, seCurVer, "show create table mysql.analyze_jobs") + chk = res.NewChunk(nil) + err = res.Next(ctx, chk) + require.NoError(t, err) + require.Equal(t, 1, chk.NumRows()) + require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_state") + require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_partition_state") +} diff --git a/pkg/session/bootstraptest/bootstrap_upgrade_test.go b/pkg/session/bootstraptest/bootstrap_upgrade_test.go index de5ad55e956d0..6a5073176e066 100644 --- a/pkg/session/bootstraptest/bootstrap_upgrade_test.go +++ b/pkg/session/bootstraptest/bootstrap_upgrade_test.go @@ -608,7 +608,7 @@ func TestUpgradeVersionForResumeJob(t *testing.T) { wg.Wait() // Make sure the second add index operation is successful. - sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id >=%d order by job_id", jobID) + sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id >=%d and table_name in ('upgrade_tbl', 'upgrade_tbl1') order by job_id", jobID) rows, err := execute(context.Background(), seLatestV, sql) require.NoError(t, err) require.GreaterOrEqual(t, len(rows), 2) @@ -624,8 +624,7 @@ func TestUpgradeVersionForResumeJob(t *testing.T) { idxFinishTS = runJob.BinlogInfo.FinishedTS } else { // The second add index op. - // notice: upgrade `tidb_runaway_queries` table will happened in `upgradeToVer212` function which is before the second add index op. - if strings.Contains(runJob.TableName, "upgrade_tbl") || strings.Contains(runJob.TableName, "tidb_runaway_queries") { + if strings.Contains(runJob.TableName, "upgrade_tbl") { require.Greater(t, runJob.BinlogInfo.FinishedTS, idxFinishTS) } else { // The upgrade DDL ops. These jobs' finishedTS must less than add index ops. diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job_test.go index f753e633773d2..d4851de502b68 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job_test.go @@ -16,12 +16,14 @@ package priorityqueue_test import ( "context" + "strings" "testing" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue" + "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" ) @@ -175,3 +177,50 @@ func TestValidateAndPrepareForDynamicPartitionedTable(t *testing.T) { require.False(t, valid) require.Equal(t, "last failed analysis duration is less than 2 times the average analysis duration", failReason) } + +func TestPerformanceOfValidateAndPrepare(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec(session.CreateAnalyzeJobs) + tk.MustExec("create database example_schema") + tk.MustExec("use example_schema") + tk.MustExec("create table example_table (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (2), partition p1 values less than (4))") + tableInfo, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("example_schema"), model.NewCIStr("example_table")) + require.NoError(t, err) + job := &priorityqueue.DynamicPartitionedTableAnalysisJob{ + SchemaName: "example_schema", + GlobalTableID: tableInfo.Meta().ID, + PartitionIDs: map[int64]struct{}{ + 113: {}, + 114: {}, + }, + Weight: 2, + } + initJobs(tk) + insertMultipleFinishedJobs(tk, "example_table", "p0") + se := tk.Session() + sctx := se.(sessionctx.Context) + valid, failReason := job.ValidateAndPrepare(sctx) + require.True(t, valid) + require.Equal(t, "", failReason) + + // Insert some failed jobs. + // Just failed. + now := tk.MustQuery("select now()").Rows()[0][0].(string) + insertFailedJobWithStartTime(tk, job.SchemaName, job.GlobalTableName, "p0", now) + + // Execute LastFailedDurationQueryForPartition directly to check the query plan. + tableSchema := job.SchemaName + tableName := job.GlobalTableName + partitionNames := []string{"p0", "p1"} + + rows, _, err := util.ExecRows(sctx, "explain format='brief' "+priorityqueue.LastFailedDurationQueryForPartition, tableSchema, tableName, partitionNames) + require.NoError(t, err) + planRows := make([]string, 0, len(rows)) + for _, row := range rows { + planRows = append(planRows, row.GetString(0)) + } + plan := strings.Join(planRows, "\n") + require.Contains(t, plan, "IndexJoin") + require.Contains(t, plan, "IndexRangeScan") +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/interval.go b/pkg/statistics/handle/autoanalyze/priorityqueue/interval.go index 95f09bfddbcff..83911a8a6bdb0 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/interval.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/interval.go @@ -59,9 +59,11 @@ const lastFailedDurationQueryForTable = ` LIMIT 1; ` +// LastFailedDurationQueryForPartition is used to get the duration of the last failed analysis for each specified partition. +// Exported for testing. // For multiple partitions, we only need to return the duration of the most recent failed analysis. // We pick the minimum duration of all failed analyses because we want to be conservative. -const lastFailedDurationQueryForPartition = ` +const LastFailedDurationQueryForPartition = ` SELECT MIN(TIMESTAMPDIFF(SECOND, aj.start_time, CURRENT_TIMESTAMP)) AS min_duration FROM ( @@ -128,7 +130,7 @@ func GetLastFailedAnalysisDuration( query = lastFailedDurationQueryForTable params = append(params, schema, tableName) } else { - query = lastFailedDurationQueryForPartition + query = LastFailedDurationQueryForPartition params = append(params, schema, tableName, partitionNames) }