Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: add indexes for mysql.analyze_jobs (#58134) #58355

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/systable_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(218), session.CurrentBootstrapVersion)
require.Equal(t, int64(219), session.CurrentBootstrapVersion)
}
2 changes: 1 addition & 1 deletion pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ func TestIndexUsageTable(t *testing.T) {
testkit.RowsWithSep("|",
"test|idt2|idx_4"))
tk.MustQuery(`select count(*) from information_schema.tidb_index_usage;`).Check(
testkit.RowsWithSep("|", "78"))
testkit.RowsWithSep("|", "80"))

tk.MustQuery(`select TABLE_SCHEMA, TABLE_NAME, INDEX_NAME from information_schema.tidb_index_usage
where TABLE_SCHEMA = 'test1';`).Check(testkit.Rows())
Expand Down
31 changes: 28 additions & 3 deletions pkg/session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,9 @@ const (
instance VARCHAR(512) NOT NULL comment 'address of the TiDB instance executing the analyze job',
process_id BIGINT(64) UNSIGNED comment 'ID of the process executing the analyze job',
PRIMARY KEY (id),
KEY (update_time)
KEY (update_time),
INDEX idx_schema_table_state (table_schema, table_name, state),
INDEX idx_schema_table_partition_state (table_schema, table_name, partition_name, state)
);`
// CreateAdvisoryLocks stores the advisory locks (get_lock, release_lock).
CreateAdvisoryLocks = `CREATE TABLE IF NOT EXISTS mysql.advisory_locks (
Expand Down Expand Up @@ -1192,16 +1194,20 @@ const (
// enable fast_create_table on default
version218 = 218

// version 219
// add modify_params to tidb_global_task and tidb_global_task_history.
version219 = 219

// ...
// [version219, version238] is the version range reserved for patches of 8.5.x
// [version220, version238] is the version range reserved for patches of 8.5.x
// ...

// next version should start with 239
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version218
var currentBootstrapVersion int64 = version219

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -1375,6 +1381,7 @@ var (
upgradeToVer216,
upgradeToVer217,
upgradeToVer218,
upgradeToVer219,
}
)

Expand Down Expand Up @@ -3273,6 +3280,24 @@ func upgradeToVer218(_ sessiontypes.Session, ver int64) {
// empty, just make lint happy.
}

const (
// addAnalyzeJobsSchemaTableStateIndex is a DDL statement that adds an index on (table_schema, table_name, state)
// columns to mysql.analyze_jobs table. This index is currently unused since queries filter on partition_name='',
// even for non-partitioned tables. It is kept for potential future optimization where queries could use this
// simpler index directly for non-partitioned tables.
addAnalyzeJobsSchemaTableStateIndex = "ALTER TABLE mysql.analyze_jobs ADD INDEX idx_schema_table_state (table_schema, table_name, state)"
// addAnalyzeJobsSchemaTablePartitionStateIndex adds an index on (table_schema, table_name, partition_name, state) to mysql.analyze_jobs
addAnalyzeJobsSchemaTablePartitionStateIndex = "ALTER TABLE mysql.analyze_jobs ADD INDEX idx_schema_table_partition_state (table_schema, table_name, partition_name, state)"
)

func upgradeToVer219(s sessiontypes.Session, ver int64) {
if ver >= version219 {
return
}
doReentrantDDL(s, addAnalyzeJobsSchemaTableStateIndex, dbterror.ErrDupKeyName)
doReentrantDDL(s, addAnalyzeJobsSchemaTablePartitionStateIndex, dbterror.ErrDupKeyName)
}

// initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist.
func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap)
Expand Down
45 changes: 45 additions & 0 deletions pkg/session/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2524,3 +2524,48 @@ func TestIndexJoinMultiPatternByUpgrade650To840(t *testing.T) {
require.Equal(t, 1, row.Len())
require.Equal(t, int64(0), row.GetInt64(0))
}

func TestTiDBUpgradeToVer219(t *testing.T) {
ctx := context.Background()
store, dom := CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()

ver218 := version218
seV218 := CreateSessionAndSetID(t, store)
txn, err := store.Begin()
require.NoError(t, err)
m := meta.NewMutator(txn)
err = m.FinishBootstrap(int64(ver218))
require.NoError(t, err)
revertVersionAndVariables(t, seV218, ver218)
err = txn.Commit(ctx)
require.NoError(t, err)
unsetStoreBootstrapped(store.UUID())

// Check if the required indexes already exist in mysql.analyze_jobs (they are created by default in new clusters)
res := MustExecToRecodeSet(t, seV218, "show create table mysql.analyze_jobs")
chk := res.NewChunk(nil)
err = res.Next(ctx, chk)
require.NoError(t, err)
require.Equal(t, 1, chk.NumRows())
require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_state")
require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_partition_state")

// Check that the indexes still exist after upgrading to the new version and that no errors occurred during the upgrade.
dom.Close()
domCurVer, err := BootstrapSession(store)
require.NoError(t, err)
defer domCurVer.Close()
seCurVer := CreateSessionAndSetID(t, store)
ver, err := getBootstrapVersion(seCurVer)
require.NoError(t, err)
require.Equal(t, currentBootstrapVersion, ver)

res = MustExecToRecodeSet(t, seCurVer, "show create table mysql.analyze_jobs")
chk = res.NewChunk(nil)
err = res.Next(ctx, chk)
require.NoError(t, err)
require.Equal(t, 1, chk.NumRows())
require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_state")
require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_partition_state")
}
5 changes: 2 additions & 3 deletions pkg/session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func TestUpgradeVersionForResumeJob(t *testing.T) {

wg.Wait()
// Make sure the second add index operation is successful.
sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id >=%d order by job_id", jobID)
sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id >=%d and table_name in ('upgrade_tbl', 'upgrade_tbl1') order by job_id", jobID)
rows, err := execute(context.Background(), seLatestV, sql)
require.NoError(t, err)
require.GreaterOrEqual(t, len(rows), 2)
Expand All @@ -624,8 +624,7 @@ func TestUpgradeVersionForResumeJob(t *testing.T) {
idxFinishTS = runJob.BinlogInfo.FinishedTS
} else {
// The second add index op.
// notice: upgrade `tidb_runaway_queries` table will happened in `upgradeToVer212` function which is before the second add index op.
if strings.Contains(runJob.TableName, "upgrade_tbl") || strings.Contains(runJob.TableName, "tidb_runaway_queries") {
if strings.Contains(runJob.TableName, "upgrade_tbl") {
require.Greater(t, runJob.BinlogInfo.FinishedTS, idxFinishTS)
} else {
// The upgrade DDL ops. These jobs' finishedTS must less than add index ops.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package priorityqueue_test

import (
"context"
"strings"
"testing"

"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -175,3 +177,50 @@ func TestValidateAndPrepareForDynamicPartitionedTable(t *testing.T) {
require.False(t, valid)
require.Equal(t, "last failed analysis duration is less than 2 times the average analysis duration", failReason)
}

func TestPerformanceOfValidateAndPrepare(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(session.CreateAnalyzeJobs)
tk.MustExec("create database example_schema")
tk.MustExec("use example_schema")
tk.MustExec("create table example_table (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (2), partition p1 values less than (4))")
tableInfo, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("example_schema"), model.NewCIStr("example_table"))
require.NoError(t, err)
job := &priorityqueue.DynamicPartitionedTableAnalysisJob{
SchemaName: "example_schema",
GlobalTableID: tableInfo.Meta().ID,
PartitionIDs: map[int64]struct{}{
113: {},
114: {},
},
Weight: 2,
}
initJobs(tk)
insertMultipleFinishedJobs(tk, "example_table", "p0")
se := tk.Session()
sctx := se.(sessionctx.Context)
valid, failReason := job.ValidateAndPrepare(sctx)
require.True(t, valid)
require.Equal(t, "", failReason)

// Insert some failed jobs.
// Just failed.
now := tk.MustQuery("select now()").Rows()[0][0].(string)
insertFailedJobWithStartTime(tk, job.SchemaName, job.GlobalTableName, "p0", now)

// Execute LastFailedDurationQueryForPartition directly to check the query plan.
tableSchema := job.SchemaName
tableName := job.GlobalTableName
partitionNames := []string{"p0", "p1"}

rows, _, err := util.ExecRows(sctx, "explain format='brief' "+priorityqueue.LastFailedDurationQueryForPartition, tableSchema, tableName, partitionNames)
require.NoError(t, err)
planRows := make([]string, 0, len(rows))
for _, row := range rows {
planRows = append(planRows, row.GetString(0))
}
plan := strings.Join(planRows, "\n")
require.Contains(t, plan, "IndexJoin")
require.Contains(t, plan, "IndexRangeScan")
}
6 changes: 4 additions & 2 deletions pkg/statistics/handle/autoanalyze/priorityqueue/interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ const lastFailedDurationQueryForTable = `
LIMIT 1;
`

// LastFailedDurationQueryForPartition is used to get the duration of the last failed analysis for each specified partition.
// Exported for testing.
// For multiple partitions, we only need to return the duration of the most recent failed analysis.
// We pick the minimum duration of all failed analyses because we want to be conservative.
const lastFailedDurationQueryForPartition = `
const LastFailedDurationQueryForPartition = `
SELECT
MIN(TIMESTAMPDIFF(SECOND, aj.start_time, CURRENT_TIMESTAMP)) AS min_duration
FROM (
Expand Down Expand Up @@ -128,7 +130,7 @@ func GetLastFailedAnalysisDuration(
query = lastFailedDurationQueryForTable
params = append(params, schema, tableName)
} else {
query = lastFailedDurationQueryForPartition
query = LastFailedDurationQueryForPartition
params = append(params, schema, tableName, partitionNames)
}

Expand Down
Loading