diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index 3d18f37af297d..b85c01f6bfd30 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -221,7 +221,9 @@ func (catalog *Catalog) RelayFromSysTableObjects( dataFactory DataFactory, readFunc func(context.Context, *TableEntry, txnif.AsyncTxn) *containers.Batch, sortFunc func([]containers.Vector, int) error, -) { + replayer ObjectListReplayer, +) (closeCB []func()) { + closeCB = make([]func(), 0) db, err := catalog.GetDatabaseByID(pkgcatalog.MO_CATALOG_ID) if err != nil { panic(err) @@ -265,7 +267,7 @@ func (catalog *Catalog) RelayFromSysTableObjects( // replay database catalog if dbBatch := readFunc(ctx, dbTbl, readTxn); dbBatch != nil { - defer dbBatch.Close() + closeCB = append(closeCB, dbBatch.Close) catalog.ReplayMODatabase(ctx, txnNode, dbBatch) } @@ -274,27 +276,33 @@ func (catalog *Catalog) RelayFromSysTableObjects( if err := sortFunc(tableBatch.Vecs, pkgcatalog.MO_TABLES_REL_ID_IDX); err != nil { panic(err) } - defer tableBatch.Close() + closeCB = append(closeCB, tableBatch.Close) columnBatch := readFunc(ctx, columnTbl, readTxn) if err := sortFunc(columnBatch.Vecs, pkgcatalog.MO_COLUMNS_ATT_RELNAME_ID_IDX); err != nil { panic(err) } - defer columnBatch.Close() - catalog.ReplayMOTables(ctx, txnNode, dataFactory, tableBatch, columnBatch) + closeCB = append(closeCB, columnBatch.Close) + catalog.ReplayMOTables(ctx, txnNode, dataFactory, tableBatch, columnBatch, replayer) } // logutil.Info(catalog.SimplePPString(common.PPL3)) + return } func (catalog *Catalog) ReplayMODatabase(ctx context.Context, txnNode *txnbase.TxnMVCCNode, bat *containers.Batch) { + dbids := vector.MustFixedColNoTypeCheck[uint64](bat.GetVectorByName(pkgcatalog.SystemDBAttr_ID).GetDownstreamVector()) + tenantIDs := vector.MustFixedColNoTypeCheck[uint32](bat.GetVectorByName(pkgcatalog.SystemDBAttr_AccID).GetDownstreamVector()) + userIDs := vector.MustFixedColNoTypeCheck[uint32](bat.GetVectorByName(pkgcatalog.SystemDBAttr_Creator).GetDownstreamVector()) + roleIDs := vector.MustFixedColNoTypeCheck[uint32](bat.GetVectorByName(pkgcatalog.SystemDBAttr_Owner).GetDownstreamVector()) + createAts := vector.MustFixedColNoTypeCheck[types.Timestamp](bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateAt).GetDownstreamVector()) for i := 0; i < bat.Length(); i++ { - dbid := bat.GetVectorByName(pkgcatalog.SystemDBAttr_ID).Get(i).(uint64) - name := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Name).Get(i).([]byte)) - tenantID := bat.GetVectorByName(pkgcatalog.SystemDBAttr_AccID).Get(i).(uint32) - userID := bat.GetVectorByName(pkgcatalog.SystemDBAttr_Creator).Get(i).(uint32) - roleID := bat.GetVectorByName(pkgcatalog.SystemDBAttr_Owner).Get(i).(uint32) - createAt := bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateAt).Get(i).(types.Timestamp) - createSql := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateSQL).Get(i).([]byte)) - datType := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Type).Get(i).([]byte)) + dbid := dbids[i] + name := bat.GetVectorByName(pkgcatalog.SystemDBAttr_Name).GetDownstreamVector().GetStringAt(i) + tenantID := tenantIDs[i] + userID := userIDs[i] + roleID := roleIDs[i] + createAt := createAts[i] + createSql := bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateSQL).GetDownstreamVector().GetStringAt(i) + datType := bat.GetVectorByName(pkgcatalog.SystemDBAttr_Type).GetDownstreamVector().GetStringAt(i) catalog.onReplayCreateDB(dbid, name, txnNode, tenantID, userID, roleID, createAt, createSql, datType) } } @@ -336,34 +344,63 @@ func (catalog *Catalog) onReplayCreateDB( db.InsertLocked(un) } -func (catalog *Catalog) ReplayMOTables(ctx context.Context, txnNode *txnbase.TxnMVCCNode, dataF DataFactory, tblBat, colBat *containers.Batch) { +func (catalog *Catalog) ReplayMOTables(ctx context.Context, txnNode *txnbase.TxnMVCCNode, dataF DataFactory, tblBat, colBat *containers.Batch, replayer ObjectListReplayer) { + tids := vector.MustFixedColNoTypeCheck[uint64](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ID).GetDownstreamVector()) + dbids := vector.MustFixedColNoTypeCheck[uint64](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_DBID).GetDownstreamVector()) + versions := vector.MustFixedColNoTypeCheck[uint32](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Version).GetDownstreamVector()) + catalogVersions := vector.MustFixedColNoTypeCheck[uint32](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CatalogVersion).GetDownstreamVector()) + partitioneds := vector.MustFixedColNoTypeCheck[int8](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partitioned).GetDownstreamVector()) + roleIDs := vector.MustFixedColNoTypeCheck[uint32](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Owner).GetDownstreamVector()) + userIDs := vector.MustFixedColNoTypeCheck[uint32](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Creator).GetDownstreamVector()) + createAts := vector.MustFixedColNoTypeCheck[types.Timestamp](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateAt).GetDownstreamVector()) + tenantIDs := vector.MustFixedColNoTypeCheck[uint32](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_AccID).GetDownstreamVector()) + + colTids := vector.MustFixedColNoTypeCheck[uint64](colBat.GetVectorByName(pkgcatalog.SystemColAttr_RelID).GetDownstreamVector()) + nullables := vector.MustFixedColNoTypeCheck[int8](colBat.GetVectorByName(pkgcatalog.SystemColAttr_NullAbility).GetDownstreamVector()) + isHiddens := vector.MustFixedColNoTypeCheck[int8](colBat.GetVectorByName(pkgcatalog.SystemColAttr_IsHidden).GetDownstreamVector()) + clusterbys := vector.MustFixedColNoTypeCheck[int8](colBat.GetVectorByName(pkgcatalog.SystemColAttr_IsClusterBy).GetDownstreamVector()) + autoIncrements := vector.MustFixedColNoTypeCheck[int8](colBat.GetVectorByName(pkgcatalog.SystemColAttr_IsAutoIncrement).GetDownstreamVector()) + idxes := vector.MustFixedColNoTypeCheck[int32](colBat.GetVectorByName(pkgcatalog.SystemColAttr_Num).GetDownstreamVector()) + seqNums := vector.MustFixedColNoTypeCheck[uint16](colBat.GetVectorByName(pkgcatalog.SystemColAttr_Seqnum).GetDownstreamVector()) + schemaOffset := 0 for i := 0; i < tblBat.Length(); i++ { - tid := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ID).Get(i).(uint64) - dbid := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_DBID).Get(i).(uint64) - name := string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Name).Get(i).([]byte)) - schema := NewEmptySchema(name) - schemaOffset = schema.ReadFromBatch(colBat, schemaOffset, tid) - schema.Comment = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Comment).Get(i).([]byte)) - schema.Version = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Version).Get(i).(uint32) - schema.CatalogVersion = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CatalogVersion).Get(i).(uint32) - schema.Partitioned = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partitioned).Get(i).(int8) - schema.Partition = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partition).Get(i).([]byte)) - schema.Relkind = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Kind).Get(i).([]byte)) - schema.Createsql = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateSQL).Get(i).([]byte)) - schema.View = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ViewDef).Get(i).([]byte)) - schema.Constraint = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Constraint).Get(i).([]byte) - schema.AcInfo = accessInfo{} - schema.AcInfo.RoleID = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Owner).Get(i).(uint32) - schema.AcInfo.UserID = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Creator).Get(i).(uint32) - schema.AcInfo.CreateAt = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateAt).Get(i).(types.Timestamp) - schema.AcInfo.TenantID = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_AccID).Get(i).(uint32) - extra := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ExtraInfo).Get(i).([]byte) - schema.MustRestoreExtra(extra) - if err := schema.Finalize(true); err != nil { - panic(err) + startOffset := schemaOffset + tid := tids[i] + for i := startOffset; i < len(colTids); i++ { + if tid != colTids[i] { + schemaOffset = i + break + } + } + replayFn := func() { + dbid := dbids[i] + name := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Name).GetDownstreamVector().GetStringAt(i) + schema := NewEmptySchema(name) + schema.ReadFromBatch( + colBat, colTids, nullables, isHiddens, clusterbys, autoIncrements, idxes, seqNums, startOffset, tid) + schema.Comment = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Comment).GetDownstreamVector().GetStringAt(i) + schema.Version = versions[i] + schema.CatalogVersion = catalogVersions[i] + schema.Partitioned = partitioneds[i] + schema.Partition = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partition).GetDownstreamVector().GetStringAt(i) + schema.Relkind = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Kind).GetDownstreamVector().GetStringAt(i) + schema.Createsql = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateSQL).GetDownstreamVector().GetStringAt(i) + schema.View = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ViewDef).GetDownstreamVector().GetStringAt(i) + schema.Constraint = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Constraint).GetDownstreamVector().GetBytesAt(i) + schema.AcInfo = accessInfo{} + schema.AcInfo.RoleID = roleIDs[i] + schema.AcInfo.UserID = userIDs[i] + schema.AcInfo.CreateAt = createAts[i] + schema.AcInfo.TenantID = tenantIDs[i] + extra := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ExtraInfo).GetDownstreamVector().GetBytesAt(i) + schema.MustRestoreExtra(extra) + if err := schema.Finalize(true); err != nil { + panic(err) + } + catalog.onReplayCreateTable(dbid, tid, schema, txnNode, dataF) } - catalog.onReplayCreateTable(dbid, tid, schema, txnNode, dataF) + replayer.Submit(dbids[i], replayFn) } } diff --git a/pkg/vm/engine/tae/catalog/schema.go b/pkg/vm/engine/tae/catalog/schema.go index c3c920a589364..86101bf928d94 100644 --- a/pkg/vm/engine/tae/catalog/schema.go +++ b/pkg/vm/engine/tae/catalog/schema.go @@ -603,9 +603,15 @@ func (s *Schema) Marshal() (buf []byte, err error) { return } -func (s *Schema) ReadFromBatch(bat *containers.Batch, offset int, targetTid uint64) (next int) { +func (s *Schema) ReadFromBatch( + bat *containers.Batch, + tids []uint64, + nullables, isHiddens, clusterbys, autoIncrements []int8, + idxes []int32, + seqNums []uint16, + offset int, + targetTid uint64) (next int) { nameVec := bat.GetVectorByName(pkgcatalog.SystemColAttr_RelName) - tidVec := bat.GetVectorByName(pkgcatalog.SystemColAttr_RelID) defer func() { slices.SortStableFunc(s.ColDefs, func(i, j *ColDef) int { return i.Idx - j.Idx @@ -615,39 +621,39 @@ func (s *Schema) ReadFromBatch(bat *containers.Batch, offset int, targetTid uint if offset >= nameVec.Length() { break } - name := string(nameVec.Get(offset).([]byte)) - id := tidVec.Get(offset).(uint64) + name := nameVec.GetDownstreamVector().GetStringAt(offset) + id := tids[offset] // every schema has 1 rowid column as last column, if have one, break if name != s.Name || targetTid != id { break } def := new(ColDef) - def.Name = string(bat.GetVectorByName((pkgcatalog.SystemColAttr_Name)).Get(offset).([]byte)) - data := bat.GetVectorByName((pkgcatalog.SystemColAttr_Type)).Get(offset).([]byte) + def.Name = bat.GetVectorByName((pkgcatalog.SystemColAttr_Name)).GetDownstreamVector().GetStringAt(offset) + data := bat.GetVectorByName((pkgcatalog.SystemColAttr_Type)).GetDownstreamVector().GetBytesAt(offset) types.Decode(data, &def.Type) - nullable := bat.GetVectorByName((pkgcatalog.SystemColAttr_NullAbility)).Get(offset).(int8) + nullable := nullables[offset] def.NullAbility = !i82bool(nullable) - isHidden := bat.GetVectorByName((pkgcatalog.SystemColAttr_IsHidden)).Get(offset).(int8) + isHidden := isHiddens[offset] def.Hidden = i82bool(isHidden) - isClusterBy := bat.GetVectorByName((pkgcatalog.SystemColAttr_IsClusterBy)).Get(offset).(int8) + isClusterBy := clusterbys[offset] def.ClusterBy = i82bool(isClusterBy) if def.ClusterBy { def.SortKey = true } - isAutoIncrement := bat.GetVectorByName((pkgcatalog.SystemColAttr_IsAutoIncrement)).Get(offset).(int8) + isAutoIncrement := autoIncrements[offset] def.AutoIncrement = i82bool(isAutoIncrement) - def.Comment = string(bat.GetVectorByName((pkgcatalog.SystemColAttr_Comment)).Get(offset).([]byte)) - def.OnUpdate = bat.GetVectorByName((pkgcatalog.SystemColAttr_Update)).Get(offset).([]byte) - def.Default = bat.GetVectorByName((pkgcatalog.SystemColAttr_DefaultExpr)).Get(offset).([]byte) - def.Idx = int(bat.GetVectorByName((pkgcatalog.SystemColAttr_Num)).Get(offset).(int32)) - 1 - def.SeqNum = bat.GetVectorByName(pkgcatalog.SystemColAttr_Seqnum).Get(offset).(uint16) - def.EnumValues = string(bat.GetVectorByName((pkgcatalog.SystemColAttr_EnumValues)).Get(offset).([]byte)) + def.Comment = bat.GetVectorByName((pkgcatalog.SystemColAttr_Comment)).GetDownstreamVector().GetStringAt(offset) + def.OnUpdate = bat.GetVectorByName((pkgcatalog.SystemColAttr_Update)).GetDownstreamVector().GetBytesAt(offset) + def.Default = bat.GetVectorByName((pkgcatalog.SystemColAttr_DefaultExpr)).GetDownstreamVector().GetBytesAt(offset) + def.Idx = int(idxes[offset]) - 1 + def.SeqNum = seqNums[offset] + def.EnumValues = bat.GetVectorByName((pkgcatalog.SystemColAttr_EnumValues)).GetDownstreamVector().GetStringAt(offset) s.NameMap[def.Name] = def.Idx s.ColDefs = append(s.ColDefs, def) if def.Name == PhyAddrColumnName { def.PhyAddr = true } - constraint := string(bat.GetVectorByName(pkgcatalog.SystemColAttr_ConstraintType).Get(offset).([]byte)) + constraint := bat.GetVectorByName(pkgcatalog.SystemColAttr_ConstraintType).GetDownstreamVector().GetStringAt(offset) if constraint == "p" { def.SortKey = true def.Primary = true diff --git a/pkg/vm/engine/tae/db/checkpoint/replay.go b/pkg/vm/engine/tae/db/checkpoint/replay.go index f50aecd2dcf98..59c7ccbf2ab5d 100644 --- a/pkg/vm/engine/tae/db/checkpoint/replay.go +++ b/pkg/vm/engine/tae/db/checkpoint/replay.go @@ -394,13 +394,19 @@ func (c *CkpReplayer) ReplayCatalog(readTxn txnif.AsyncTxn) (err error) { _, err2 = mergesort.SortBlockColumns(cols, pkidx, c.r.rt.VectorPool.Transient) return } - c.r.catalog.RelayFromSysTableObjects( + closeFn := c.r.catalog.RelayFromSysTableObjects( c.r.ctx, readTxn, c.dataF, tables.ReadSysTableBatch, sortFunc, + c, ) + c.wg.Wait() + for _, fn := range closeFn { + fn() + } + c.resetObjectCountMap() // logutil.Info(c.r.catalog.SimplePPString(common.PPL0)) return } @@ -481,6 +487,10 @@ func (c *CkpReplayer) Submit(tid uint64, replayFn func()) { c.objectReplayWorker[workerOffset].Enqueue(replayFn) } +func (c *CkpReplayer) resetObjectCountMap() { + c.objectCountMap = map[uint64]int{} +} + func (r *runner) Replay(dataFactory catalog.DataFactory) *CkpReplayer { replayer := &CkpReplayer{ r: r, diff --git a/pkg/vm/engine/tae/tables/table_scan.go b/pkg/vm/engine/tae/tables/table_scan.go index bc3f45da6b895..30b6ab1030c1d 100644 --- a/pkg/vm/engine/tae/tables/table_scan.go +++ b/pkg/vm/engine/tae/tables/table_scan.go @@ -142,6 +142,17 @@ func ReadSysTableBatch(ctx context.Context, entry *catalog.TableEntry, readTxn t panic(fmt.Sprintf("unsupported sys table id %v", entry)) } schema := entry.GetLastestSchema(false) + prefetchIt := entry.MakeObjectIt(false) + defer prefetchIt.Release() + for prefetchIt.Next() { + obj := prefetchIt.Item() + if !obj.IsVisible(readTxn) { + continue + } + for blkOffset := range obj.BlockCnt() { + obj.GetObjectData().Prefetch(uint16(blkOffset)) + } + } it := entry.MakeObjectIt(false) defer it.Release() colIdxes := make([]int, 0, len(schema.ColDefs))