Skip to content

Commit

Permalink
Merge pull request #138053 from yuzefovich/backport24.2-137960
Browse files Browse the repository at this point in the history
release-24.2: stats: use available type metadata when hydrating UDTs
  • Loading branch information
yuzefovich authored Jan 14, 2025
2 parents 1c240d7 + 3ae5948 commit df935aa
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 51 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,9 @@ func getTableStatsForBackup(
for i := range descs {
if tbl, _, _, _, _ := descpb.GetDescriptors(&descs[i]); tbl != nil {
tableDesc := tabledesc.NewBuilder(tbl).BuildImmutableTable()
tableStatisticsAcc, err := statsCache.GetTableStats(ctx, tableDesc)
// nil typeResolver means that we'll use the latest committed type
// metadata which is acceptable.
tableStatisticsAcc, err := statsCache.GetTableStats(ctx, tableDesc, nil /* typeResolver */)
if err != nil {
log.Warningf(ctx, "failed to collect stats for table: %s, "+
"table ID: %d during a backup: %s", tableDesc.GetName(), tableDesc.GetID(),
Expand Down
15 changes: 13 additions & 2 deletions pkg/sql/distsql_plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand Down Expand Up @@ -285,8 +286,13 @@ func (dsp *DistSQLPlanner) createPartialStatsPlan(
return nil, pgerror.Newf(pgcode.FeatureNotSupported, "multi-column partial statistics are not currently supported")
}

var typeResolver *descs.DistSQLTypeResolver
if p := planCtx.planner; p != nil {
r := descs.NewDistSQLTypeResolver(p.Descriptors(), p.Txn())
typeResolver = &r
}
// Fetch all stats for the table that matches the given table descriptor.
tableStats, err := planCtx.ExtendedEvalCtx.ExecCfg.TableStatsCache.GetTableStats(ctx, desc)
tableStats, err := planCtx.ExtendedEvalCtx.ExecCfg.TableStatsCache.GetTableStats(ctx, desc, typeResolver)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -662,7 +668,12 @@ func (dsp *DistSQLPlanner) createStatsPlan(
}
}

tableStats, err := planCtx.ExtendedEvalCtx.ExecCfg.TableStatsCache.GetTableStats(ctx, desc)
var typeResolver *descs.DistSQLTypeResolver
if p := planCtx.planner; p != nil {
r := descs.NewDistSQLTypeResolver(p.Descriptors(), p.Txn())
typeResolver = &r
}
tableStats, err := planCtx.ExtendedEvalCtx.ExecCfg.TableStatsCache.GetTableStats(ctx, desc, typeResolver)
if err != nil {
return nil, err
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/stats
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,17 @@ CREATE TABLE t122312 (s STRING, g greeting AS (s::greeting) STORED);

statement ok
ANALYZE t122312;

# Regression for not using the latest type metadata after the UDT modification
# within the same txn (#129623).
statement ok
INSERT INTO t122312 VALUES ('hi');

statement ok
ANALYZE t122312;

statement ok
BEGIN;
ALTER TYPE greeting ADD VALUE 'hey';
SELECT * FROM t122312 WHERE g = 'hi';
COMMIT;
8 changes: 7 additions & 1 deletion pkg/sql/opt_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/opt/cat"
Expand Down Expand Up @@ -547,8 +548,13 @@ func (oc *optCatalog) dataSourceForTable(
// statistics and the zone config haven't changed.
var tableStats []*stats.TableStatistic
if !flags.NoTableStats {
var typeResolver *descs.DistSQLTypeResolver
if p := oc.planner; p != nil {
r := descs.NewDistSQLTypeResolver(p.Descriptors(), p.Txn())
typeResolver = &r
}
var err error
tableStats, err = oc.planner.execCfg.TableStatsCache.GetTableStats(ctx, desc)
tableStats, err = oc.planner.execCfg.TableStatsCache.GetTableStats(ctx, desc, typeResolver)
if err != nil {
// Ignore any error. We still want to be able to run queries even if we lose
// access to the statistics table.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sem/tree/datum.go
Original file line number Diff line number Diff line change
Expand Up @@ -5340,7 +5340,7 @@ func (d *DEnum) Compare(ctx context.Context, cmpCtx CompareContext, other Datum)
if v.EnumTyp.TypeMeta.Version != d.EnumTyp.TypeMeta.Version {
panic(errors.AssertionFailedf(
"comparison of two different versions of enum %s oid %d: versions %d and %d",
errors.Safe(d.EnumTyp.SQLString), d.EnumTyp.Oid(), d.EnumTyp.TypeMeta.Version,
d.EnumTyp.SQLStringForError(), errors.Safe(d.EnumTyp.Oid()), d.EnumTyp.TypeMeta.Version,
v.EnumTyp.TypeMeta.Version,
))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/stats/automatic_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func (r *Refresher) maybeRefreshStats(
rowsAffected int64,
asOf time.Duration,
) {
tableStats, err := r.cache.getTableStatsFromCache(ctx, tableID, nil /* forecast */, nil /* udtCols */)
tableStats, err := r.cache.getTableStatsFromCache(ctx, tableID, nil /* forecast */, nil /* udtCols */, nil /* typeResolver */)
if err != nil {
log.Errorf(ctx, "failed to get table statistics: %v", err)
return
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/stats/automatic_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func TestAverageRefreshTime(t *testing.T) {

checkAverageRefreshTime := func(expected time.Duration) error {
return testutils.SucceedsSoonError(func() error {
stats, err := cache.GetTableStats(ctx, table)
stats, err := cache.GetTableStats(ctx, table, nil /* typeResolver */)
if err != nil {
return err
}
Expand All @@ -396,7 +396,7 @@ func TestAverageRefreshTime(t *testing.T) {
// expectedAge time ago if lessThan is true (false).
checkMostRecentStat := func(expectedAge time.Duration, lessThan bool) error {
return testutils.SucceedsSoonError(func() error {
stats, err := cache.GetTableStats(ctx, table)
stats, err := cache.GetTableStats(ctx, table, nil /* typeResolver */)
if err != nil {
return err
}
Expand Down Expand Up @@ -878,7 +878,7 @@ func checkStatsCount(
ctx context.Context, cache *TableStatisticsCache, table catalog.TableDescriptor, expected int,
) error {
return testutils.SucceedsSoonError(func() error {
stats, err := cache.GetTableStats(ctx, table)
stats, err := cache.GetTableStats(ctx, table, nil /* typeResolver */)
if err != nil {
return err
}
Expand All @@ -905,7 +905,7 @@ func compareStatsCountWithZero(
desc :=
desctestutils.TestingGetPublicTableDescriptor(s.DB(), s.Codec(), "system", tableName)
return testutils.SucceedsSoonError(func() error {
stats, err := cache.GetTableStats(ctx, desc)
stats, err := cache.GetTableStats(ctx, desc, nil /* typeResolver */)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/stats/delete_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestDeleteOldStatsForColumns(t *testing.T) {

return testutils.SucceedsSoonError(func() error {
tableStats, err := cache.getTableStatsFromCache(
ctx, tableID, nil /* forecast */, nil, /* udtCols */
ctx, tableID, nil /* forecast */, nil /* udtCols */, nil, /* typeResolver */
)
if err != nil {
return err
Expand All @@ -270,7 +270,7 @@ func TestDeleteOldStatsForColumns(t *testing.T) {
stat := &testData[i]
if stat.TableID != tableID {
stats, err := cache.getTableStatsFromCache(
ctx, stat.TableID, nil /* forecast */, nil, /* udtCols */
ctx, stat.TableID, nil /* forecast */, nil /* udtCols */, nil, /* typeResolver */
)
if err != nil {
return err
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestDeleteOldStatsForOtherColumns(t *testing.T) {

return testutils.SucceedsSoonError(func() error {
tableStats, err := cache.getTableStatsFromCache(
ctx, tableID, nil /* forecast */, nil, /* udtCols */
ctx, tableID, nil /* forecast */, nil /* udtCols */, nil, /* typeResolver */
)
if err != nil {
return err
Expand All @@ -568,7 +568,7 @@ func TestDeleteOldStatsForOtherColumns(t *testing.T) {
stat := &testData[i]
if stat.TableID != tableID {
stats, err := cache.getTableStatsFromCache(
ctx, stat.TableID, nil /* forecast */, nil, /* udtCols */
ctx, stat.TableID, nil /* forecast */, nil /* udtCols */, nil, /* typeResolver */
)
if err != nil {
return err
Expand Down
76 changes: 44 additions & 32 deletions pkg/sql/stats/stats_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,20 +207,24 @@ func decodeTableStatisticsKV(
// and if the stats are not present in the cache, it looks them up in
// system.table_statistics.
//
// typeResolver argument is optional and will be used to hydrate all
// user-defined types. If the resolver is not provided, then the latest
// committed type metadata will be used.
//
// The function returns an error if we could not query the system table. It
// silently ignores any statistics that can't be decoded (e.g. because
// user-defined types don't exit).
//
// The statistics are ordered by their CreatedAt time (newest-to-oldest).
func (sc *TableStatisticsCache) GetTableStats(
ctx context.Context, table catalog.TableDescriptor,
ctx context.Context, table catalog.TableDescriptor, typeResolver *descs.DistSQLTypeResolver,
) (stats []*TableStatistic, err error) {
if !statsUsageAllowed(table, sc.settings) {
return nil, nil
}
forecast := forecastAllowed(table, sc.settings)
return sc.getTableStatsFromCache(
ctx, table.GetID(), &forecast, table.UserDefinedTypeColumns(),
ctx, table.GetID(), &forecast, table.UserDefinedTypeColumns(), typeResolver,
)
}

Expand Down Expand Up @@ -306,7 +310,11 @@ func forecastAllowed(table catalog.TableDescriptor, clusterSettings *cluster.Set
// getTableStatsFromCache is like GetTableStats but assumes that the table ID
// is safe to fetch statistics for: non-system, non-virtual, non-view, etc.
func (sc *TableStatisticsCache) getTableStatsFromCache(
ctx context.Context, tableID descpb.ID, forecast *bool, udtCols []catalog.Column,
ctx context.Context,
tableID descpb.ID,
forecast *bool,
udtCols []catalog.Column,
typeResolver *descs.DistSQLTypeResolver,
) ([]*TableStatistic, error) {
sc.mu.Lock()
defer sc.mu.Unlock()
Expand All @@ -320,7 +328,7 @@ func (sc *TableStatisticsCache) getTableStatsFromCache(
}
}

return sc.addCacheEntryLocked(ctx, tableID, forecast != nil && *forecast)
return sc.addCacheEntryLocked(ctx, tableID, forecast != nil && *forecast, typeResolver)
}

// isStale checks whether we need to evict and re-load the cache entry.
Expand Down Expand Up @@ -396,7 +404,7 @@ func (sc *TableStatisticsCache) lookupStatsLocked(
// - stats are retrieved from database:
// - mutex is locked again and the entry is updated.
func (sc *TableStatisticsCache) addCacheEntryLocked(
ctx context.Context, tableID descpb.ID, forecast bool,
ctx context.Context, tableID descpb.ID, forecast bool, typeResolver *descs.DistSQLTypeResolver,
) (stats []*TableStatistic, err error) {
// Add a cache entry that other queries can find and wait on until we have the
// stats.
Expand All @@ -413,7 +421,7 @@ func (sc *TableStatisticsCache) addCacheEntryLocked(
defer sc.mu.Lock()

log.VEventf(ctx, 1, "reading statistics for table %d", tableID)
stats, udts, err = sc.getTableStatsFromDB(ctx, tableID, forecast, sc.settings)
stats, udts, err = sc.getTableStatsFromDB(ctx, tableID, forecast, sc.settings, typeResolver)
log.VEventf(ctx, 1, "finished reading statistics for table %d", tableID)
}()

Expand Down Expand Up @@ -479,7 +487,7 @@ func (sc *TableStatisticsCache) refreshCacheEntry(

log.VEventf(ctx, 1, "refreshing statistics for table %d", tableID)
// TODO(radu): pass the timestamp and use AS OF SYSTEM TIME.
stats, udts, err = sc.getTableStatsFromDB(ctx, tableID, forecast, sc.settings)
stats, udts, err = sc.getTableStatsFromDB(ctx, tableID, forecast, sc.settings, nil /* typeResolver */)
log.VEventf(ctx, 1, "done refreshing statistics for table %d", tableID)
}()
if e.lastRefreshTimestamp.Equal(ts) {
Expand Down Expand Up @@ -618,7 +626,7 @@ func NewTableStatisticProto(datums tree.Datums) (*TableStatisticProto, error) {
// parseStats converts the given datums to a TableStatistic object. It might
// need to run a query to get user defined type metadata.
func (sc *TableStatisticsCache) parseStats(
ctx context.Context, datums tree.Datums,
ctx context.Context, datums tree.Datums, typeResolver *descs.DistSQLTypeResolver,
) (_ *TableStatistic, _ *types.T, err error) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -644,33 +652,33 @@ func (sc *TableStatisticsCache) parseStats(
res := &TableStatistic{TableStatisticProto: *tsp}
var udt *types.T
if res.HistogramData != nil && (len(res.HistogramData.Buckets) > 0 || res.RowCount == res.NullCount) {
// hydrate the type in case any user defined types are present.
// Hydrate the type in case any user defined types are present.
// There are cases where typ is nil, so don't do anything if so.
if typ := res.HistogramData.ColumnType; typ != nil && typ.UserDefined() {
// The metadata accessed here is never older than the metadata used when
// collecting the stats. Changes to types are backwards compatible across
// versions, so using a newer version of the type metadata here is safe.
// Given that we never delete members from enum types, a descriptor we
// get from the lease manager will be able to be used to decode these stats,
// even if it wasn't the descriptor that was used to collect the stats.
// If have types that are not backwards compatible in this way, then we
// will need to start writing a timestamp on the stats objects and request
// TypeDescriptor's with the timestamp that the stats were recorded with.
//
// TODO(ajwerner): We now do delete members from enum types. See #67050.
if err := sc.db.DescsTxn(ctx, func(
ctx context.Context, txn descs.Txn,
) error {
resolver := descs.NewDistSQLTypeResolver(txn.Descriptors(), txn.KV())
var err error
udt, err = resolver.ResolveTypeByOID(ctx, typ.Oid())
if typeResolver != nil {
udt, err = typeResolver.ResolveTypeByOID(ctx, typ.Oid())
if err != nil {
return nil, nil, err
}
res.HistogramData.ColumnType = udt
return err
}); err != nil {
return nil, nil, err
} else {
// The metadata accessed here is never older than the metadata
// used when collecting the stats. Changes to types are
// backwards compatible across versions, so using a newer
// version of the type metadata here is safe.
if err = sc.db.DescsTxn(ctx, func(
ctx context.Context, txn descs.Txn,
) error {
resolver := descs.NewDistSQLTypeResolver(txn.Descriptors(), txn.KV())
udt, err = resolver.ResolveTypeByOID(ctx, typ.Oid())
res.HistogramData.ColumnType = udt
return err
}); err != nil {
return nil, nil, err
}
}
}
if err := DecodeHistogramBuckets(res); err != nil {
if err = DecodeHistogramBuckets(res); err != nil {
return nil, nil, err
}
}
Expand Down Expand Up @@ -784,7 +792,11 @@ func (tsp *TableStatisticProto) IsAuto() bool {
// It ignores any statistics that cannot be decoded (e.g. because a user-defined
// type that doesn't exist) and returns the rest (with no error).
func (sc *TableStatisticsCache) getTableStatsFromDB(
ctx context.Context, tableID descpb.ID, forecast bool, st *cluster.Settings,
ctx context.Context,
tableID descpb.ID,
forecast bool,
st *cluster.Settings,
typeResolver *descs.DistSQLTypeResolver,
) (_ []*TableStatistic, _ map[descpb.ColumnID]*types.T, err error) {
getTableStatisticsStmt := `
SELECT
Expand Down Expand Up @@ -835,7 +847,7 @@ ORDER BY "createdAt" DESC, "columnIDs" DESC, "statisticID" DESC
var udts map[descpb.ColumnID]*types.T
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
stats, udt, err := sc.parseStats(ctx, it.Cur())
stats, udt, err := sc.parseStats(ctx, it.Cur(), typeResolver)
if err != nil {
log.Warningf(ctx, "could not decode statistic for table %d: %v", tableID, err)
continue
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/stats/stats_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func checkStatsForTable(

// Perform the lookup and refresh, and confirm the
// returned stats match the expected values.
statsList, err := sc.getTableStatsFromCache(ctx, tableID, nil /* forecast */, nil /* udtCols */)
statsList, err := sc.getTableStatsFromCache(ctx, tableID, nil /* forecast */, nil /* udtCols */, nil /* typeResolver */)
if err != nil {
t.Fatalf("error retrieving stats: %s", err)
}
Expand Down Expand Up @@ -338,7 +338,7 @@ func TestCacheUserDefinedTypes(t *testing.T) {
tbl := desctestutils.TestingGetPublicTableDescriptor(kvDB, s.Codec(), "t", "tt")
// Get stats for our table. We are ensuring here that the access to the stats
// for tt properly hydrates the user defined type t before access.
stats, err := sc.GetTableStats(ctx, tbl)
stats, err := sc.GetTableStats(ctx, tbl, nil /* typeResolver */)
if err != nil {
t.Fatal(err)
}
Expand All @@ -353,7 +353,7 @@ func TestCacheUserDefinedTypes(t *testing.T) {
sc.InvalidateTableStats(ctx, tbl.GetID())
// Verify that GetTableStats ignores the statistic on the now unknown type and
// returns the rest.
stats, err = sc.GetTableStats(ctx, tbl)
stats, err = sc.GetTableStats(ctx, tbl, nil /* typeResolver */)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -402,7 +402,7 @@ func TestCacheWait(t *testing.T) {
for n := 0; n < 10; n++ {
wg.Add(1)
go func() {
stats, err := sc.getTableStatsFromCache(ctx, id, nil /* forecast */, nil /* udtCols */)
stats, err := sc.getTableStatsFromCache(ctx, id, nil /* forecast */, nil /* udtCols */, nil /* typeResolver */)
if err != nil {
t.Error(err)
} else if !checkStats(stats, expectedStats[id]) {
Expand Down Expand Up @@ -451,7 +451,7 @@ func TestCacheAutoRefresh(t *testing.T) {
tableDesc := desctestutils.TestingGetPublicTableDescriptor(s.DB(), s.Codec(), "test", "t")

expectNStats := func(n int) error {
stats, err := sc.GetTableStats(ctx, tableDesc)
stats, err := sc.GetTableStats(ctx, tableDesc, nil /* typeResolver */)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit df935aa

Please sign in to comment.