diff --git a/br/pkg/restore/snap_client/systable_restore_test.go b/br/pkg/restore/snap_client/systable_restore_test.go index 8eaf3ad29b545..3759d3d9c8a6a 100644 --- a/br/pkg/restore/snap_client/systable_restore_test.go +++ b/br/pkg/restore/snap_client/systable_restore_test.go @@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) { // // The above variables are in the file br/pkg/restore/systable_restore.go func TestMonitorTheSystemTableIncremental(t *testing.T) { - require.Equal(t, int64(218), session.CurrentBootstrapVersion) + require.Equal(t, int64(219), session.CurrentBootstrapVersion) } diff --git a/pkg/executor/infoschema_reader_test.go b/pkg/executor/infoschema_reader_test.go index 25ee563e6c429..80578f87abd0a 100644 --- a/pkg/executor/infoschema_reader_test.go +++ b/pkg/executor/infoschema_reader_test.go @@ -663,7 +663,7 @@ func TestIndexUsageTable(t *testing.T) { testkit.RowsWithSep("|", "test|idt2|idx_4")) tk.MustQuery(`select count(*) from information_schema.tidb_index_usage;`).Check( - testkit.RowsWithSep("|", "78")) + testkit.RowsWithSep("|", "80")) tk.MustQuery(`select TABLE_SCHEMA, TABLE_NAME, INDEX_NAME from information_schema.tidb_index_usage where TABLE_SCHEMA = 'test1';`).Check(testkit.Rows()) diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index 62868e1215fed..ed731af61dfef 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -458,7 +458,9 @@ const ( instance VARCHAR(512) NOT NULL comment 'address of the TiDB instance executing the analyze job', process_id BIGINT(64) UNSIGNED comment 'ID of the process executing the analyze job', PRIMARY KEY (id), - KEY (update_time) + KEY (update_time), + INDEX idx_schema_table_state (table_schema, table_name, state), + INDEX idx_schema_table_partition_state (table_schema, table_name, partition_name, state) );` // CreateAdvisoryLocks stores the advisory locks (get_lock, release_lock). CreateAdvisoryLocks = `CREATE TABLE IF NOT EXISTS mysql.advisory_locks ( @@ -1192,8 +1194,12 @@ const ( // enable fast_create_table on default version218 = 218 + // version 219 + // add modify_params to tidb_global_task and tidb_global_task_history. + version219 = 219 + // ... - // [version219, version238] is the version range reserved for patches of 8.5.x + // [version220, version238] is the version range reserved for patches of 8.5.x // ... // next version should start with 239 @@ -1201,7 +1207,7 @@ const ( // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version218 +var currentBootstrapVersion int64 = version219 // DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it. var internalSQLTimeout = owner.ManagerSessionTTL + 15 @@ -1375,6 +1381,7 @@ var ( upgradeToVer216, upgradeToVer217, upgradeToVer218, + upgradeToVer219, } ) @@ -3273,6 +3280,24 @@ func upgradeToVer218(_ sessiontypes.Session, ver int64) { // empty, just make lint happy. } +const ( + // addAnalyzeJobsSchemaTableStateIndex is a DDL statement that adds an index on (table_schema, table_name, state) + // columns to mysql.analyze_jobs table. This index is currently unused since queries filter on partition_name='', + // even for non-partitioned tables. It is kept for potential future optimization where queries could use this + // simpler index directly for non-partitioned tables. + addAnalyzeJobsSchemaTableStateIndex = "ALTER TABLE mysql.analyze_jobs ADD INDEX idx_schema_table_state (table_schema, table_name, state)" + // addAnalyzeJobsSchemaTablePartitionStateIndex adds an index on (table_schema, table_name, partition_name, state) to mysql.analyze_jobs + addAnalyzeJobsSchemaTablePartitionStateIndex = "ALTER TABLE mysql.analyze_jobs ADD INDEX idx_schema_table_partition_state (table_schema, table_name, partition_name, state)" +) + +func upgradeToVer219(s sessiontypes.Session, ver int64) { + if ver >= version219 { + return + } + doReentrantDDL(s, addAnalyzeJobsSchemaTableStateIndex, dbterror.ErrDupKeyName) + doReentrantDDL(s, addAnalyzeJobsSchemaTablePartitionStateIndex, dbterror.ErrDupKeyName) +} + // initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist. func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap) diff --git a/pkg/session/bootstrap_test.go b/pkg/session/bootstrap_test.go index 9a887fe13dff8..2b75ec739db56 100644 --- a/pkg/session/bootstrap_test.go +++ b/pkg/session/bootstrap_test.go @@ -2524,3 +2524,48 @@ func TestIndexJoinMultiPatternByUpgrade650To840(t *testing.T) { require.Equal(t, 1, row.Len()) require.Equal(t, int64(0), row.GetInt64(0)) } + +func TestTiDBUpgradeToVer219(t *testing.T) { + ctx := context.Background() + store, dom := CreateStoreAndBootstrap(t) + defer func() { require.NoError(t, store.Close()) }() + + ver218 := version218 + seV218 := CreateSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMutator(txn) + err = m.FinishBootstrap(int64(ver218)) + require.NoError(t, err) + revertVersionAndVariables(t, seV218, ver218) + err = txn.Commit(ctx) + require.NoError(t, err) + unsetStoreBootstrapped(store.UUID()) + + // Check if the required indexes already exist in mysql.analyze_jobs (they are created by default in new clusters) + res := MustExecToRecodeSet(t, seV218, "show create table mysql.analyze_jobs") + chk := res.NewChunk(nil) + err = res.Next(ctx, chk) + require.NoError(t, err) + require.Equal(t, 1, chk.NumRows()) + require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_state") + require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_partition_state") + + // Check that the indexes still exist after upgrading to the new version and that no errors occurred during the upgrade. + dom.Close() + domCurVer, err := BootstrapSession(store) + require.NoError(t, err) + defer domCurVer.Close() + seCurVer := CreateSessionAndSetID(t, store) + ver, err := getBootstrapVersion(seCurVer) + require.NoError(t, err) + require.Equal(t, currentBootstrapVersion, ver) + + res = MustExecToRecodeSet(t, seCurVer, "show create table mysql.analyze_jobs") + chk = res.NewChunk(nil) + err = res.Next(ctx, chk) + require.NoError(t, err) + require.Equal(t, 1, chk.NumRows()) + require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_state") + require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_partition_state") +} diff --git a/pkg/session/bootstraptest/bootstrap_upgrade_test.go b/pkg/session/bootstraptest/bootstrap_upgrade_test.go index 3a4449d295176..58282810e2f74 100644 --- a/pkg/session/bootstraptest/bootstrap_upgrade_test.go +++ b/pkg/session/bootstraptest/bootstrap_upgrade_test.go @@ -608,7 +608,7 @@ func TestUpgradeVersionForResumeJob(t *testing.T) { wg.Wait() // Make sure the second add index operation is successful. - sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id >=%d order by job_id", jobID) + sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id >=%d and table_name in ('upgrade_tbl', 'upgrade_tbl1') order by job_id", jobID) rows, err := execute(context.Background(), seLatestV, sql) require.NoError(t, err) require.GreaterOrEqual(t, len(rows), 2) @@ -624,8 +624,7 @@ func TestUpgradeVersionForResumeJob(t *testing.T) { idxFinishTS = runJob.BinlogInfo.FinishedTS } else { // The second add index op. - // notice: upgrade `tidb_runaway_queries` table will happened in `upgradeToVer212` function which is before the second add index op. - if strings.Contains(runJob.TableName, "upgrade_tbl") || strings.Contains(runJob.TableName, "tidb_runaway_queries") { + if strings.Contains(runJob.TableName, "upgrade_tbl") { require.Greater(t, runJob.BinlogInfo.FinishedTS, idxFinishTS) } else { // The upgrade DDL ops. These jobs' finishedTS must less than add index ops. diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job_test.go index f753e633773d2..d4851de502b68 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job_test.go @@ -16,12 +16,14 @@ package priorityqueue_test import ( "context" + "strings" "testing" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue" + "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" ) @@ -175,3 +177,50 @@ func TestValidateAndPrepareForDynamicPartitionedTable(t *testing.T) { require.False(t, valid) require.Equal(t, "last failed analysis duration is less than 2 times the average analysis duration", failReason) } + +func TestPerformanceOfValidateAndPrepare(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec(session.CreateAnalyzeJobs) + tk.MustExec("create database example_schema") + tk.MustExec("use example_schema") + tk.MustExec("create table example_table (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (2), partition p1 values less than (4))") + tableInfo, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("example_schema"), model.NewCIStr("example_table")) + require.NoError(t, err) + job := &priorityqueue.DynamicPartitionedTableAnalysisJob{ + SchemaName: "example_schema", + GlobalTableID: tableInfo.Meta().ID, + PartitionIDs: map[int64]struct{}{ + 113: {}, + 114: {}, + }, + Weight: 2, + } + initJobs(tk) + insertMultipleFinishedJobs(tk, "example_table", "p0") + se := tk.Session() + sctx := se.(sessionctx.Context) + valid, failReason := job.ValidateAndPrepare(sctx) + require.True(t, valid) + require.Equal(t, "", failReason) + + // Insert some failed jobs. + // Just failed. + now := tk.MustQuery("select now()").Rows()[0][0].(string) + insertFailedJobWithStartTime(tk, job.SchemaName, job.GlobalTableName, "p0", now) + + // Execute LastFailedDurationQueryForPartition directly to check the query plan. + tableSchema := job.SchemaName + tableName := job.GlobalTableName + partitionNames := []string{"p0", "p1"} + + rows, _, err := util.ExecRows(sctx, "explain format='brief' "+priorityqueue.LastFailedDurationQueryForPartition, tableSchema, tableName, partitionNames) + require.NoError(t, err) + planRows := make([]string, 0, len(rows)) + for _, row := range rows { + planRows = append(planRows, row.GetString(0)) + } + plan := strings.Join(planRows, "\n") + require.Contains(t, plan, "IndexJoin") + require.Contains(t, plan, "IndexRangeScan") +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/interval.go b/pkg/statistics/handle/autoanalyze/priorityqueue/interval.go index 95f09bfddbcff..83911a8a6bdb0 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/interval.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/interval.go @@ -59,9 +59,11 @@ const lastFailedDurationQueryForTable = ` LIMIT 1; ` +// LastFailedDurationQueryForPartition is used to get the duration of the last failed analysis for each specified partition. +// Exported for testing. // For multiple partitions, we only need to return the duration of the most recent failed analysis. // We pick the minimum duration of all failed analyses because we want to be conservative. -const lastFailedDurationQueryForPartition = ` +const LastFailedDurationQueryForPartition = ` SELECT MIN(TIMESTAMPDIFF(SECOND, aj.start_time, CURRENT_TIMESTAMP)) AS min_duration FROM ( @@ -128,7 +130,7 @@ func GetLastFailedAnalysisDuration( query = lastFailedDurationQueryForTable params = append(params, schema, tableName) } else { - query = lastFailedDurationQueryForPartition + query = LastFailedDurationQueryForPartition params = append(params, schema, tableName, partitionNames) }