From 011d4d6453e55d439ce3f9c8c750063ef1cca123 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Thu, 9 Jan 2025 14:12:19 +0800 Subject: [PATCH 01/10] add prefetch --- pkg/vm/engine/tae/tables/table_scan.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/vm/engine/tae/tables/table_scan.go b/pkg/vm/engine/tae/tables/table_scan.go index bc3f45da6b895..30b6ab1030c1d 100644 --- a/pkg/vm/engine/tae/tables/table_scan.go +++ b/pkg/vm/engine/tae/tables/table_scan.go @@ -142,6 +142,17 @@ func ReadSysTableBatch(ctx context.Context, entry *catalog.TableEntry, readTxn t panic(fmt.Sprintf("unsupported sys table id %v", entry)) } schema := entry.GetLastestSchema(false) + prefetchIt := entry.MakeObjectIt(false) + defer prefetchIt.Release() + for prefetchIt.Next() { + obj := prefetchIt.Item() + if !obj.IsVisible(readTxn) { + continue + } + for blkOffset := range obj.BlockCnt() { + obj.GetObjectData().Prefetch(uint16(blkOffset)) + } + } it := entry.MakeObjectIt(false) defer it.Release() colIdxes := make([]int, 0, len(schema.ColDefs)) From 9156232196584dd2085810bdb956105396b82069 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Thu, 9 Jan 2025 14:56:06 +0800 Subject: [PATCH 02/10] replay catalog in parallel --- pkg/vm/engine/tae/catalog/catalogreplay.go | 115 +++++++++++++-------- pkg/vm/engine/tae/catalog/schema.go | 24 +++-- pkg/vm/engine/tae/db/checkpoint/replay.go | 19 +++- 3 files changed, 104 insertions(+), 54 deletions(-) diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index 3d18f37af297d..a1f6bfd90aba0 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -221,7 +221,9 @@ func (catalog *Catalog) RelayFromSysTableObjects( dataFactory DataFactory, readFunc func(context.Context, *TableEntry, txnif.AsyncTxn) *containers.Batch, sortFunc func([]containers.Vector, int) error, -) { + replayer ObjectListReplayer, +) (closeCB []func()) { + closeCB = make([]func(), 0) db, err := catalog.GetDatabaseByID(pkgcatalog.MO_CATALOG_ID) if err != nil { panic(err) @@ -265,8 +267,8 @@ func (catalog *Catalog) RelayFromSysTableObjects( // replay database catalog if dbBatch := readFunc(ctx, dbTbl, readTxn); dbBatch != nil { - defer dbBatch.Close() - catalog.ReplayMODatabase(ctx, txnNode, dbBatch) + closeCB = append(closeCB, dbBatch.Close) + catalog.ReplayMODatabase(ctx, txnNode, dbBatch, replayer) } // replay table catalog @@ -274,28 +276,37 @@ func (catalog *Catalog) RelayFromSysTableObjects( if err := sortFunc(tableBatch.Vecs, pkgcatalog.MO_TABLES_REL_ID_IDX); err != nil { panic(err) } - defer tableBatch.Close() + closeCB = append(closeCB, tableBatch.Close) columnBatch := readFunc(ctx, columnTbl, readTxn) if err := sortFunc(columnBatch.Vecs, pkgcatalog.MO_COLUMNS_ATT_RELNAME_ID_IDX); err != nil { panic(err) } - defer columnBatch.Close() - catalog.ReplayMOTables(ctx, txnNode, dataFactory, tableBatch, columnBatch) + closeCB = append(closeCB, columnBatch.Close) + catalog.ReplayMOTables(ctx, txnNode, dataFactory, tableBatch, columnBatch, replayer) } // logutil.Info(catalog.SimplePPString(common.PPL3)) + return } -func (catalog *Catalog) ReplayMODatabase(ctx context.Context, txnNode *txnbase.TxnMVCCNode, bat *containers.Batch) { +func (catalog *Catalog) ReplayMODatabase(ctx context.Context, txnNode *txnbase.TxnMVCCNode, bat *containers.Batch, replayer ObjectListReplayer) { + dbids := vector.MustFixedColNoTypeCheck[uint64](bat.GetVectorByName(pkgcatalog.SystemDBAttr_ID).GetDownstreamVector()) + tenantIDs := vector.MustFixedColNoTypeCheck[uint32](bat.GetVectorByName(pkgcatalog.SystemDBAttr_AccID).GetDownstreamVector()) + userIDs := vector.MustFixedColNoTypeCheck[uint32](bat.GetVectorByName(pkgcatalog.SystemDBAttr_Creator).GetDownstreamVector()) + roleIDs := vector.MustFixedColNoTypeCheck[uint32](bat.GetVectorByName(pkgcatalog.SystemDBAttr_Owner).GetDownstreamVector()) + createAts := vector.MustFixedColNoTypeCheck[types.Timestamp](bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateAt).GetDownstreamVector()) for i := 0; i < bat.Length(); i++ { - dbid := bat.GetVectorByName(pkgcatalog.SystemDBAttr_ID).Get(i).(uint64) - name := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Name).Get(i).([]byte)) - tenantID := bat.GetVectorByName(pkgcatalog.SystemDBAttr_AccID).Get(i).(uint32) - userID := bat.GetVectorByName(pkgcatalog.SystemDBAttr_Creator).Get(i).(uint32) - roleID := bat.GetVectorByName(pkgcatalog.SystemDBAttr_Owner).Get(i).(uint32) - createAt := bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateAt).Get(i).(types.Timestamp) - createSql := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateSQL).Get(i).([]byte)) - datType := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Type).Get(i).([]byte)) - catalog.onReplayCreateDB(dbid, name, txnNode, tenantID, userID, roleID, createAt, createSql, datType) + replayFn := func() { + dbid := dbids[i] + name := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Name).Get(i).([]byte)) + tenantID := tenantIDs[i] + userID := userIDs[i] + roleID := roleIDs[i] + createAt := createAts[i] + createSql := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateSQL).Get(i).([]byte)) + datType := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Type).Get(i).([]byte)) + catalog.onReplayCreateDB(dbid, name, txnNode, tenantID, userID, roleID, createAt, createSql, datType) + } + replayer.Submit(0, replayFn) } } @@ -336,34 +347,56 @@ func (catalog *Catalog) onReplayCreateDB( db.InsertLocked(un) } -func (catalog *Catalog) ReplayMOTables(ctx context.Context, txnNode *txnbase.TxnMVCCNode, dataF DataFactory, tblBat, colBat *containers.Batch) { +func (catalog *Catalog) ReplayMOTables(ctx context.Context, txnNode *txnbase.TxnMVCCNode, dataF DataFactory, tblBat, colBat *containers.Batch, replayer ObjectListReplayer) { + tids := vector.MustFixedColNoTypeCheck[uint64](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ID).GetDownstreamVector()) + dbids := vector.MustFixedColNoTypeCheck[uint64](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_DBID).GetDownstreamVector()) + versions := vector.MustFixedColNoTypeCheck[uint32](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Version).GetDownstreamVector()) + catalogVersions := vector.MustFixedColNoTypeCheck[uint32](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CatalogVersion).GetDownstreamVector()) + partitioneds := vector.MustFixedColNoTypeCheck[int8](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partitioned).GetDownstreamVector()) + roleIDs := vector.MustFixedColNoTypeCheck[uint32](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Owner).GetDownstreamVector()) + userIDs := vector.MustFixedColNoTypeCheck[uint32](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Creator).GetDownstreamVector()) + createAts := vector.MustFixedColNoTypeCheck[types.Timestamp](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateAt).GetDownstreamVector()) + tenantIDs := vector.MustFixedColNoTypeCheck[uint32](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_AccID).GetDownstreamVector()) + + colTids := vector.MustFixedColNoTypeCheck[uint64](colBat.GetVectorByName(pkgcatalog.SystemColAttr_RelID).GetDownstreamVector()) + nullables := vector.MustFixedColNoTypeCheck[int8](colBat.GetVectorByName(pkgcatalog.SystemColAttr_NullAbility).GetDownstreamVector()) + isHiddens := vector.MustFixedColNoTypeCheck[int8](colBat.GetVectorByName(pkgcatalog.SystemColAttr_IsHidden).GetDownstreamVector()) + clusterbys := vector.MustFixedColNoTypeCheck[int8](colBat.GetVectorByName(pkgcatalog.SystemColAttr_IsClusterBy).GetDownstreamVector()) + autoIncrements := vector.MustFixedColNoTypeCheck[int8](colBat.GetVectorByName(pkgcatalog.SystemColAttr_IsAutoIncrement).GetDownstreamVector()) + idxes := vector.MustFixedColNoTypeCheck[int32](colBat.GetVectorByName(pkgcatalog.SystemColAttr_Num).GetDownstreamVector()) + seqNums := vector.MustFixedColNoTypeCheck[uint16](colBat.GetVectorByName(pkgcatalog.SystemColAttr_Seqnum).GetDownstreamVector()) + schemaOffset := 0 for i := 0; i < tblBat.Length(); i++ { - tid := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ID).Get(i).(uint64) - dbid := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_DBID).Get(i).(uint64) - name := string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Name).Get(i).([]byte)) - schema := NewEmptySchema(name) - schemaOffset = schema.ReadFromBatch(colBat, schemaOffset, tid) - schema.Comment = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Comment).Get(i).([]byte)) - schema.Version = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Version).Get(i).(uint32) - schema.CatalogVersion = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CatalogVersion).Get(i).(uint32) - schema.Partitioned = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partitioned).Get(i).(int8) - schema.Partition = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partition).Get(i).([]byte)) - schema.Relkind = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Kind).Get(i).([]byte)) - schema.Createsql = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateSQL).Get(i).([]byte)) - schema.View = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ViewDef).Get(i).([]byte)) - schema.Constraint = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Constraint).Get(i).([]byte) - schema.AcInfo = accessInfo{} - schema.AcInfo.RoleID = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Owner).Get(i).(uint32) - schema.AcInfo.UserID = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Creator).Get(i).(uint32) - schema.AcInfo.CreateAt = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateAt).Get(i).(types.Timestamp) - schema.AcInfo.TenantID = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_AccID).Get(i).(uint32) - extra := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ExtraInfo).Get(i).([]byte) - schema.MustRestoreExtra(extra) - if err := schema.Finalize(true); err != nil { - panic(err) + replayFn := func() { + tid := tids[i] + dbid := dbids[i] + name := string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Name).Get(i).([]byte)) + schema := NewEmptySchema(name) + schemaOffset = schema.ReadFromBatch( + colBat, colTids, nullables, isHiddens, clusterbys, autoIncrements, idxes, seqNums, schemaOffset, tid) + schema.Comment = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Comment).Get(i).([]byte)) + schema.Version = versions[i] + schema.CatalogVersion = catalogVersions[i] + schema.Partitioned = partitioneds[i] + schema.Partition = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partition).Get(i).([]byte)) + schema.Relkind = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Kind).Get(i).([]byte)) + schema.Createsql = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateSQL).Get(i).([]byte)) + schema.View = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ViewDef).Get(i).([]byte)) + schema.Constraint = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Constraint).Get(i).([]byte) + schema.AcInfo = accessInfo{} + schema.AcInfo.RoleID = roleIDs[i] + schema.AcInfo.UserID = userIDs[i] + schema.AcInfo.CreateAt = createAts[i] + schema.AcInfo.TenantID = tenantIDs[i] + extra := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ExtraInfo).Get(i).([]byte) + schema.MustRestoreExtra(extra) + if err := schema.Finalize(true); err != nil { + panic(err) + } + catalog.onReplayCreateTable(dbid, tid, schema, txnNode, dataF) } - catalog.onReplayCreateTable(dbid, tid, schema, txnNode, dataF) + replayer.Submit(0, replayFn) } } diff --git a/pkg/vm/engine/tae/catalog/schema.go b/pkg/vm/engine/tae/catalog/schema.go index c3c920a589364..6e9ce64ada74a 100644 --- a/pkg/vm/engine/tae/catalog/schema.go +++ b/pkg/vm/engine/tae/catalog/schema.go @@ -603,9 +603,15 @@ func (s *Schema) Marshal() (buf []byte, err error) { return } -func (s *Schema) ReadFromBatch(bat *containers.Batch, offset int, targetTid uint64) (next int) { +func (s *Schema) ReadFromBatch( + bat *containers.Batch, + tids []uint64, + nullables, isHiddens, clusterbys, autoIncrements []int8, + idxes []int32, + seqNums []uint16, + offset int, + targetTid uint64) (next int) { nameVec := bat.GetVectorByName(pkgcatalog.SystemColAttr_RelName) - tidVec := bat.GetVectorByName(pkgcatalog.SystemColAttr_RelID) defer func() { slices.SortStableFunc(s.ColDefs, func(i, j *ColDef) int { return i.Idx - j.Idx @@ -616,7 +622,7 @@ func (s *Schema) ReadFromBatch(bat *containers.Batch, offset int, targetTid uint break } name := string(nameVec.Get(offset).([]byte)) - id := tidVec.Get(offset).(uint64) + id := tids[offset] // every schema has 1 rowid column as last column, if have one, break if name != s.Name || targetTid != id { break @@ -625,22 +631,22 @@ func (s *Schema) ReadFromBatch(bat *containers.Batch, offset int, targetTid uint def.Name = string(bat.GetVectorByName((pkgcatalog.SystemColAttr_Name)).Get(offset).([]byte)) data := bat.GetVectorByName((pkgcatalog.SystemColAttr_Type)).Get(offset).([]byte) types.Decode(data, &def.Type) - nullable := bat.GetVectorByName((pkgcatalog.SystemColAttr_NullAbility)).Get(offset).(int8) + nullable := nullables[offset] def.NullAbility = !i82bool(nullable) - isHidden := bat.GetVectorByName((pkgcatalog.SystemColAttr_IsHidden)).Get(offset).(int8) + isHidden := isHiddens[offset] def.Hidden = i82bool(isHidden) - isClusterBy := bat.GetVectorByName((pkgcatalog.SystemColAttr_IsClusterBy)).Get(offset).(int8) + isClusterBy := clusterbys[offset] def.ClusterBy = i82bool(isClusterBy) if def.ClusterBy { def.SortKey = true } - isAutoIncrement := bat.GetVectorByName((pkgcatalog.SystemColAttr_IsAutoIncrement)).Get(offset).(int8) + isAutoIncrement := autoIncrements[offset] def.AutoIncrement = i82bool(isAutoIncrement) def.Comment = string(bat.GetVectorByName((pkgcatalog.SystemColAttr_Comment)).Get(offset).([]byte)) def.OnUpdate = bat.GetVectorByName((pkgcatalog.SystemColAttr_Update)).Get(offset).([]byte) def.Default = bat.GetVectorByName((pkgcatalog.SystemColAttr_DefaultExpr)).Get(offset).([]byte) - def.Idx = int(bat.GetVectorByName((pkgcatalog.SystemColAttr_Num)).Get(offset).(int32)) - 1 - def.SeqNum = bat.GetVectorByName(pkgcatalog.SystemColAttr_Seqnum).Get(offset).(uint16) + def.Idx = int(idxes[offset]) - 1 + def.SeqNum = seqNums[offset] def.EnumValues = string(bat.GetVectorByName((pkgcatalog.SystemColAttr_EnumValues)).Get(offset).([]byte)) s.NameMap[def.Name] = def.Idx s.ColDefs = append(s.ColDefs, def) diff --git a/pkg/vm/engine/tae/db/checkpoint/replay.go b/pkg/vm/engine/tae/db/checkpoint/replay.go index f50aecd2dcf98..d964d4dc49762 100644 --- a/pkg/vm/engine/tae/db/checkpoint/replay.go +++ b/pkg/vm/engine/tae/db/checkpoint/replay.go @@ -17,6 +17,7 @@ package checkpoint import ( "context" "fmt" + "math/rand/v2" "sort" "sync" "time" @@ -394,13 +395,18 @@ func (c *CkpReplayer) ReplayCatalog(readTxn txnif.AsyncTxn) (err error) { _, err2 = mergesort.SortBlockColumns(cols, pkidx, c.r.rt.VectorPool.Transient) return } - c.r.catalog.RelayFromSysTableObjects( + closeFn := c.r.catalog.RelayFromSysTableObjects( c.r.ctx, readTxn, c.dataF, tables.ReadSysTableBatch, sortFunc, + c, ) + c.wg.Wait() + for _, fn := range closeFn { + fn() + } // logutil.Info(c.r.catalog.SimplePPString(common.PPL0)) return } @@ -476,9 +482,14 @@ func (c *CkpReplayer) ReplayObjectlist() (err error) { func (c *CkpReplayer) Submit(tid uint64, replayFn func()) { c.wg.Add(1) - workerOffset := tid % uint64(len(c.objectReplayWorker)) - c.objectCountMap[tid] = c.objectCountMap[tid] + 1 - c.objectReplayWorker[workerOffset].Enqueue(replayFn) + if tid == 0 { + workerOffset := rand.IntN(len(c.objectReplayWorker)) + c.objectReplayWorker[workerOffset].Enqueue(replayFn) + } else { + workerOffset := tid % uint64(len(c.objectReplayWorker)) + c.objectCountMap[tid] = c.objectCountMap[tid] + 1 + c.objectReplayWorker[workerOffset].Enqueue(replayFn) + } } func (r *runner) Replay(dataFactory catalog.DataFactory) *CkpReplayer { From 49d31b524779eb0346866282ce5c57b8ee390649 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Thu, 9 Jan 2025 15:56:23 +0800 Subject: [PATCH 03/10] fix --- pkg/vm/engine/tae/catalog/catalogreplay.go | 27 ++++++++++------------ pkg/vm/engine/tae/db/checkpoint/replay.go | 16 ++++++------- 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index a1f6bfd90aba0..b4b667dfe7574 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -268,7 +268,7 @@ func (catalog *Catalog) RelayFromSysTableObjects( // replay database catalog if dbBatch := readFunc(ctx, dbTbl, readTxn); dbBatch != nil { closeCB = append(closeCB, dbBatch.Close) - catalog.ReplayMODatabase(ctx, txnNode, dbBatch, replayer) + catalog.ReplayMODatabase(ctx, txnNode, dbBatch) } // replay table catalog @@ -288,25 +288,22 @@ func (catalog *Catalog) RelayFromSysTableObjects( return } -func (catalog *Catalog) ReplayMODatabase(ctx context.Context, txnNode *txnbase.TxnMVCCNode, bat *containers.Batch, replayer ObjectListReplayer) { +func (catalog *Catalog) ReplayMODatabase(ctx context.Context, txnNode *txnbase.TxnMVCCNode, bat *containers.Batch) { dbids := vector.MustFixedColNoTypeCheck[uint64](bat.GetVectorByName(pkgcatalog.SystemDBAttr_ID).GetDownstreamVector()) tenantIDs := vector.MustFixedColNoTypeCheck[uint32](bat.GetVectorByName(pkgcatalog.SystemDBAttr_AccID).GetDownstreamVector()) userIDs := vector.MustFixedColNoTypeCheck[uint32](bat.GetVectorByName(pkgcatalog.SystemDBAttr_Creator).GetDownstreamVector()) roleIDs := vector.MustFixedColNoTypeCheck[uint32](bat.GetVectorByName(pkgcatalog.SystemDBAttr_Owner).GetDownstreamVector()) createAts := vector.MustFixedColNoTypeCheck[types.Timestamp](bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateAt).GetDownstreamVector()) for i := 0; i < bat.Length(); i++ { - replayFn := func() { - dbid := dbids[i] - name := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Name).Get(i).([]byte)) - tenantID := tenantIDs[i] - userID := userIDs[i] - roleID := roleIDs[i] - createAt := createAts[i] - createSql := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateSQL).Get(i).([]byte)) - datType := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Type).Get(i).([]byte)) - catalog.onReplayCreateDB(dbid, name, txnNode, tenantID, userID, roleID, createAt, createSql, datType) - } - replayer.Submit(0, replayFn) + dbid := dbids[i] + name := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Name).Get(i).([]byte)) + tenantID := tenantIDs[i] + userID := userIDs[i] + roleID := roleIDs[i] + createAt := createAts[i] + createSql := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateSQL).Get(i).([]byte)) + datType := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Type).Get(i).([]byte)) + catalog.onReplayCreateDB(dbid, name, txnNode, tenantID, userID, roleID, createAt, createSql, datType) } } @@ -396,7 +393,7 @@ func (catalog *Catalog) ReplayMOTables(ctx context.Context, txnNode *txnbase.Txn } catalog.onReplayCreateTable(dbid, tid, schema, txnNode, dataF) } - replayer.Submit(0, replayFn) + replayer.Submit(dbids[i], replayFn) } } diff --git a/pkg/vm/engine/tae/db/checkpoint/replay.go b/pkg/vm/engine/tae/db/checkpoint/replay.go index d964d4dc49762..2006896916b6c 100644 --- a/pkg/vm/engine/tae/db/checkpoint/replay.go +++ b/pkg/vm/engine/tae/db/checkpoint/replay.go @@ -407,6 +407,7 @@ func (c *CkpReplayer) ReplayCatalog(readTxn txnif.AsyncTxn) (err error) { for _, fn := range closeFn { fn() } + c.resetObjectCountMap() // logutil.Info(c.r.catalog.SimplePPString(common.PPL0)) return } @@ -482,14 +483,13 @@ func (c *CkpReplayer) ReplayObjectlist() (err error) { func (c *CkpReplayer) Submit(tid uint64, replayFn func()) { c.wg.Add(1) - if tid == 0 { - workerOffset := rand.IntN(len(c.objectReplayWorker)) - c.objectReplayWorker[workerOffset].Enqueue(replayFn) - } else { - workerOffset := tid % uint64(len(c.objectReplayWorker)) - c.objectCountMap[tid] = c.objectCountMap[tid] + 1 - c.objectReplayWorker[workerOffset].Enqueue(replayFn) - } + workerOffset := tid % uint64(len(c.objectReplayWorker)) + c.objectCountMap[tid] = c.objectCountMap[tid] + 1 + c.objectReplayWorker[workerOffset].Enqueue(replayFn) +} + +func (c *CkpReplayer) resetObjectCountMap() { + c.objectCountMap = map[uint64]int{} } func (r *runner) Replay(dataFactory catalog.DataFactory) *CkpReplayer { From baae609e48e47dbd7a346f5c87a3c56e2e7d7184 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Thu, 9 Jan 2025 15:58:40 +0800 Subject: [PATCH 04/10] fix --- pkg/vm/engine/tae/db/checkpoint/replay.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/vm/engine/tae/db/checkpoint/replay.go b/pkg/vm/engine/tae/db/checkpoint/replay.go index 2006896916b6c..59c7ccbf2ab5d 100644 --- a/pkg/vm/engine/tae/db/checkpoint/replay.go +++ b/pkg/vm/engine/tae/db/checkpoint/replay.go @@ -17,7 +17,6 @@ package checkpoint import ( "context" "fmt" - "math/rand/v2" "sort" "sync" "time" From 1fe6755b775432691780b877a59cd68df2ed2315 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Thu, 9 Jan 2025 16:08:35 +0800 Subject: [PATCH 05/10] copy byte --- pkg/vm/engine/tae/catalog/catalogreplay.go | 28 +++++++++++++--------- pkg/vm/engine/tae/catalog/schema.go | 16 ++++++------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index b4b667dfe7574..2c40465d31a54 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -296,13 +296,13 @@ func (catalog *Catalog) ReplayMODatabase(ctx context.Context, txnNode *txnbase.T createAts := vector.MustFixedColNoTypeCheck[types.Timestamp](bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateAt).GetDownstreamVector()) for i := 0; i < bat.Length(); i++ { dbid := dbids[i] - name := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Name).Get(i).([]byte)) + name := string(copyBytes(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Name).Get(i).([]byte))) tenantID := tenantIDs[i] userID := userIDs[i] roleID := roleIDs[i] createAt := createAts[i] - createSql := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateSQL).Get(i).([]byte)) - datType := string(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Type).Get(i).([]byte)) + createSql := string(copyBytes(bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateSQL).Get(i).([]byte))) + datType := string(copyBytes(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Type).Get(i).([]byte))) catalog.onReplayCreateDB(dbid, name, txnNode, tenantID, userID, roleID, createAt, createSql, datType) } } @@ -344,6 +344,12 @@ func (catalog *Catalog) onReplayCreateDB( db.InsertLocked(un) } +func copyBytes(src []byte) []byte { + ret := make([]byte, len(src)) + copy(ret, src) + return ret +} + func (catalog *Catalog) ReplayMOTables(ctx context.Context, txnNode *txnbase.TxnMVCCNode, dataF DataFactory, tblBat, colBat *containers.Batch, replayer ObjectListReplayer) { tids := vector.MustFixedColNoTypeCheck[uint64](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ID).GetDownstreamVector()) dbids := vector.MustFixedColNoTypeCheck[uint64](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_DBID).GetDownstreamVector()) @@ -368,25 +374,25 @@ func (catalog *Catalog) ReplayMOTables(ctx context.Context, txnNode *txnbase.Txn replayFn := func() { tid := tids[i] dbid := dbids[i] - name := string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Name).Get(i).([]byte)) + name := string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Name).Get(i).([]byte))) schema := NewEmptySchema(name) schemaOffset = schema.ReadFromBatch( colBat, colTids, nullables, isHiddens, clusterbys, autoIncrements, idxes, seqNums, schemaOffset, tid) - schema.Comment = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Comment).Get(i).([]byte)) + schema.Comment = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Comment).Get(i).([]byte))) schema.Version = versions[i] schema.CatalogVersion = catalogVersions[i] schema.Partitioned = partitioneds[i] - schema.Partition = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partition).Get(i).([]byte)) - schema.Relkind = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Kind).Get(i).([]byte)) - schema.Createsql = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateSQL).Get(i).([]byte)) - schema.View = string(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ViewDef).Get(i).([]byte)) - schema.Constraint = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Constraint).Get(i).([]byte) + schema.Partition = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partition).Get(i).([]byte))) + schema.Relkind = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Kind).Get(i).([]byte))) + schema.Createsql = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateSQL).Get(i).([]byte))) + schema.View = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ViewDef).Get(i).([]byte))) + schema.Constraint = copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Constraint).Get(i).([]byte)) schema.AcInfo = accessInfo{} schema.AcInfo.RoleID = roleIDs[i] schema.AcInfo.UserID = userIDs[i] schema.AcInfo.CreateAt = createAts[i] schema.AcInfo.TenantID = tenantIDs[i] - extra := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ExtraInfo).Get(i).([]byte) + extra := copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ExtraInfo).Get(i).([]byte)) schema.MustRestoreExtra(extra) if err := schema.Finalize(true); err != nil { panic(err) diff --git a/pkg/vm/engine/tae/catalog/schema.go b/pkg/vm/engine/tae/catalog/schema.go index 6e9ce64ada74a..e095e12d2ce24 100644 --- a/pkg/vm/engine/tae/catalog/schema.go +++ b/pkg/vm/engine/tae/catalog/schema.go @@ -621,15 +621,15 @@ func (s *Schema) ReadFromBatch( if offset >= nameVec.Length() { break } - name := string(nameVec.Get(offset).([]byte)) + name := string(copyBytes(nameVec.Get(offset).([]byte))) id := tids[offset] // every schema has 1 rowid column as last column, if have one, break if name != s.Name || targetTid != id { break } def := new(ColDef) - def.Name = string(bat.GetVectorByName((pkgcatalog.SystemColAttr_Name)).Get(offset).([]byte)) - data := bat.GetVectorByName((pkgcatalog.SystemColAttr_Type)).Get(offset).([]byte) + def.Name = string(copyBytes(bat.GetVectorByName((pkgcatalog.SystemColAttr_Name)).Get(offset).([]byte))) + data := copyBytes(bat.GetVectorByName((pkgcatalog.SystemColAttr_Type)).Get(offset).([]byte)) types.Decode(data, &def.Type) nullable := nullables[offset] def.NullAbility = !i82bool(nullable) @@ -642,18 +642,18 @@ func (s *Schema) ReadFromBatch( } isAutoIncrement := autoIncrements[offset] def.AutoIncrement = i82bool(isAutoIncrement) - def.Comment = string(bat.GetVectorByName((pkgcatalog.SystemColAttr_Comment)).Get(offset).([]byte)) - def.OnUpdate = bat.GetVectorByName((pkgcatalog.SystemColAttr_Update)).Get(offset).([]byte) - def.Default = bat.GetVectorByName((pkgcatalog.SystemColAttr_DefaultExpr)).Get(offset).([]byte) + def.Comment = string(copyBytes(bat.GetVectorByName((pkgcatalog.SystemColAttr_Comment)).Get(offset).([]byte))) + def.OnUpdate = copyBytes(bat.GetVectorByName((pkgcatalog.SystemColAttr_Update)).Get(offset).([]byte)) + def.Default = copyBytes(bat.GetVectorByName((pkgcatalog.SystemColAttr_DefaultExpr)).Get(offset).([]byte)) def.Idx = int(idxes[offset]) - 1 def.SeqNum = seqNums[offset] - def.EnumValues = string(bat.GetVectorByName((pkgcatalog.SystemColAttr_EnumValues)).Get(offset).([]byte)) + def.EnumValues = string(copyBytes(bat.GetVectorByName((pkgcatalog.SystemColAttr_EnumValues)).Get(offset).([]byte))) s.NameMap[def.Name] = def.Idx s.ColDefs = append(s.ColDefs, def) if def.Name == PhyAddrColumnName { def.PhyAddr = true } - constraint := string(bat.GetVectorByName(pkgcatalog.SystemColAttr_ConstraintType).Get(offset).([]byte)) + constraint := string(copyBytes(bat.GetVectorByName(pkgcatalog.SystemColAttr_ConstraintType).Get(offset).([]byte))) if constraint == "p" { def.SortKey = true def.Primary = true From 77e71ebaea08c3d010da3903dfb5f38f84605b2a Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Thu, 9 Jan 2025 16:23:21 +0800 Subject: [PATCH 06/10] fix --- pkg/vm/engine/tae/catalog/catalogreplay.go | 53 ++++++++++------------ 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index 2c40465d31a54..108dfe7c3b86c 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -371,35 +371,32 @@ func (catalog *Catalog) ReplayMOTables(ctx context.Context, txnNode *txnbase.Txn schemaOffset := 0 for i := 0; i < tblBat.Length(); i++ { - replayFn := func() { - tid := tids[i] - dbid := dbids[i] - name := string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Name).Get(i).([]byte))) - schema := NewEmptySchema(name) - schemaOffset = schema.ReadFromBatch( - colBat, colTids, nullables, isHiddens, clusterbys, autoIncrements, idxes, seqNums, schemaOffset, tid) - schema.Comment = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Comment).Get(i).([]byte))) - schema.Version = versions[i] - schema.CatalogVersion = catalogVersions[i] - schema.Partitioned = partitioneds[i] - schema.Partition = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partition).Get(i).([]byte))) - schema.Relkind = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Kind).Get(i).([]byte))) - schema.Createsql = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateSQL).Get(i).([]byte))) - schema.View = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ViewDef).Get(i).([]byte))) - schema.Constraint = copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Constraint).Get(i).([]byte)) - schema.AcInfo = accessInfo{} - schema.AcInfo.RoleID = roleIDs[i] - schema.AcInfo.UserID = userIDs[i] - schema.AcInfo.CreateAt = createAts[i] - schema.AcInfo.TenantID = tenantIDs[i] - extra := copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ExtraInfo).Get(i).([]byte)) - schema.MustRestoreExtra(extra) - if err := schema.Finalize(true); err != nil { - panic(err) - } - catalog.onReplayCreateTable(dbid, tid, schema, txnNode, dataF) + tid := tids[i] + dbid := dbids[i] + name := string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Name).Get(i).([]byte))) + schema := NewEmptySchema(name) + schemaOffset = schema.ReadFromBatch( + colBat, colTids, nullables, isHiddens, clusterbys, autoIncrements, idxes, seqNums, schemaOffset, tid) + schema.Comment = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Comment).Get(i).([]byte))) + schema.Version = versions[i] + schema.CatalogVersion = catalogVersions[i] + schema.Partitioned = partitioneds[i] + schema.Partition = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partition).Get(i).([]byte))) + schema.Relkind = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Kind).Get(i).([]byte))) + schema.Createsql = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateSQL).Get(i).([]byte))) + schema.View = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ViewDef).Get(i).([]byte))) + schema.Constraint = copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Constraint).Get(i).([]byte)) + schema.AcInfo = accessInfo{} + schema.AcInfo.RoleID = roleIDs[i] + schema.AcInfo.UserID = userIDs[i] + schema.AcInfo.CreateAt = createAts[i] + schema.AcInfo.TenantID = tenantIDs[i] + extra := copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ExtraInfo).Get(i).([]byte)) + schema.MustRestoreExtra(extra) + if err := schema.Finalize(true); err != nil { + panic(err) } - replayer.Submit(dbids[i], replayFn) + catalog.onReplayCreateTable(dbid, tid, schema, txnNode, dataF) } } From 6158b6083f919295cbed16755573527975a6e2b1 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Thu, 9 Jan 2025 16:52:07 +0800 Subject: [PATCH 07/10] fix --- pkg/vm/engine/tae/catalog/catalogreplay.go | 58 +++++++++++++--------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index 108dfe7c3b86c..7234d37387ed3 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -371,32 +371,42 @@ func (catalog *Catalog) ReplayMOTables(ctx context.Context, txnNode *txnbase.Txn schemaOffset := 0 for i := 0; i < tblBat.Length(); i++ { + startOffset := schemaOffset tid := tids[i] - dbid := dbids[i] - name := string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Name).Get(i).([]byte))) - schema := NewEmptySchema(name) - schemaOffset = schema.ReadFromBatch( - colBat, colTids, nullables, isHiddens, clusterbys, autoIncrements, idxes, seqNums, schemaOffset, tid) - schema.Comment = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Comment).Get(i).([]byte))) - schema.Version = versions[i] - schema.CatalogVersion = catalogVersions[i] - schema.Partitioned = partitioneds[i] - schema.Partition = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partition).Get(i).([]byte))) - schema.Relkind = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Kind).Get(i).([]byte))) - schema.Createsql = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateSQL).Get(i).([]byte))) - schema.View = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ViewDef).Get(i).([]byte))) - schema.Constraint = copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Constraint).Get(i).([]byte)) - schema.AcInfo = accessInfo{} - schema.AcInfo.RoleID = roleIDs[i] - schema.AcInfo.UserID = userIDs[i] - schema.AcInfo.CreateAt = createAts[i] - schema.AcInfo.TenantID = tenantIDs[i] - extra := copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ExtraInfo).Get(i).([]byte)) - schema.MustRestoreExtra(extra) - if err := schema.Finalize(true); err != nil { - panic(err) + for i := startOffset; i < len(colTids); i++ { + if tid != colTids[i] { + schemaOffset = i + break + } + } + replayFn := func() { + dbid := dbids[i] + name := string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Name).Get(i).([]byte))) + schema := NewEmptySchema(name) + schema.ReadFromBatch( + colBat, colTids, nullables, isHiddens, clusterbys, autoIncrements, idxes, seqNums, startOffset, tid) + schema.Comment = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Comment).Get(i).([]byte))) + schema.Version = versions[i] + schema.CatalogVersion = catalogVersions[i] + schema.Partitioned = partitioneds[i] + schema.Partition = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partition).Get(i).([]byte))) + schema.Relkind = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Kind).Get(i).([]byte))) + schema.Createsql = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateSQL).Get(i).([]byte))) + schema.View = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ViewDef).Get(i).([]byte))) + schema.Constraint = copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Constraint).Get(i).([]byte)) + schema.AcInfo = accessInfo{} + schema.AcInfo.RoleID = roleIDs[i] + schema.AcInfo.UserID = userIDs[i] + schema.AcInfo.CreateAt = createAts[i] + schema.AcInfo.TenantID = tenantIDs[i] + extra := copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ExtraInfo).Get(i).([]byte)) + schema.MustRestoreExtra(extra) + if err := schema.Finalize(true); err != nil { + panic(err) + } + catalog.onReplayCreateTable(dbid, tid, schema, txnNode, dataF) } - catalog.onReplayCreateTable(dbid, tid, schema, txnNode, dataF) + replayer.Submit(dbids[i], replayFn) } } From edf265c50c9b909cb4a61935bc6b189b0d3e2703 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Thu, 9 Jan 2025 17:56:02 +0800 Subject: [PATCH 08/10] update --- pkg/vm/engine/tae/catalog/catalogreplay.go | 16 ++++++++-------- pkg/vm/engine/tae/catalog/schema.go | 16 ++++++++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index 7234d37387ed3..2b345da1a2cd3 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -381,25 +381,25 @@ func (catalog *Catalog) ReplayMOTables(ctx context.Context, txnNode *txnbase.Txn } replayFn := func() { dbid := dbids[i] - name := string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Name).Get(i).([]byte))) + name := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Name).GetDownstreamVector().GetStringAt(i) schema := NewEmptySchema(name) schema.ReadFromBatch( colBat, colTids, nullables, isHiddens, clusterbys, autoIncrements, idxes, seqNums, startOffset, tid) - schema.Comment = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Comment).Get(i).([]byte))) + schema.Comment = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Comment).GetDownstreamVector().GetStringAt(i) schema.Version = versions[i] schema.CatalogVersion = catalogVersions[i] schema.Partitioned = partitioneds[i] - schema.Partition = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partition).Get(i).([]byte))) - schema.Relkind = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Kind).Get(i).([]byte))) - schema.Createsql = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateSQL).Get(i).([]byte))) - schema.View = string(copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ViewDef).Get(i).([]byte))) - schema.Constraint = copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Constraint).Get(i).([]byte)) + schema.Partition = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Partition).GetDownstreamVector().GetStringAt(i) + schema.Relkind = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Kind).GetDownstreamVector().GetStringAt(i) + schema.Createsql = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_CreateSQL).GetDownstreamVector().GetStringAt(i) + schema.View = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ViewDef).GetDownstreamVector().GetStringAt(i) + schema.Constraint = tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_Constraint).GetDownstreamVector().GetBytesAt(i) schema.AcInfo = accessInfo{} schema.AcInfo.RoleID = roleIDs[i] schema.AcInfo.UserID = userIDs[i] schema.AcInfo.CreateAt = createAts[i] schema.AcInfo.TenantID = tenantIDs[i] - extra := copyBytes(tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ExtraInfo).Get(i).([]byte)) + extra := tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ExtraInfo).GetDownstreamVector().GetBytesAt(i) schema.MustRestoreExtra(extra) if err := schema.Finalize(true); err != nil { panic(err) diff --git a/pkg/vm/engine/tae/catalog/schema.go b/pkg/vm/engine/tae/catalog/schema.go index e095e12d2ce24..86101bf928d94 100644 --- a/pkg/vm/engine/tae/catalog/schema.go +++ b/pkg/vm/engine/tae/catalog/schema.go @@ -621,15 +621,15 @@ func (s *Schema) ReadFromBatch( if offset >= nameVec.Length() { break } - name := string(copyBytes(nameVec.Get(offset).([]byte))) + name := nameVec.GetDownstreamVector().GetStringAt(offset) id := tids[offset] // every schema has 1 rowid column as last column, if have one, break if name != s.Name || targetTid != id { break } def := new(ColDef) - def.Name = string(copyBytes(bat.GetVectorByName((pkgcatalog.SystemColAttr_Name)).Get(offset).([]byte))) - data := copyBytes(bat.GetVectorByName((pkgcatalog.SystemColAttr_Type)).Get(offset).([]byte)) + def.Name = bat.GetVectorByName((pkgcatalog.SystemColAttr_Name)).GetDownstreamVector().GetStringAt(offset) + data := bat.GetVectorByName((pkgcatalog.SystemColAttr_Type)).GetDownstreamVector().GetBytesAt(offset) types.Decode(data, &def.Type) nullable := nullables[offset] def.NullAbility = !i82bool(nullable) @@ -642,18 +642,18 @@ func (s *Schema) ReadFromBatch( } isAutoIncrement := autoIncrements[offset] def.AutoIncrement = i82bool(isAutoIncrement) - def.Comment = string(copyBytes(bat.GetVectorByName((pkgcatalog.SystemColAttr_Comment)).Get(offset).([]byte))) - def.OnUpdate = copyBytes(bat.GetVectorByName((pkgcatalog.SystemColAttr_Update)).Get(offset).([]byte)) - def.Default = copyBytes(bat.GetVectorByName((pkgcatalog.SystemColAttr_DefaultExpr)).Get(offset).([]byte)) + def.Comment = bat.GetVectorByName((pkgcatalog.SystemColAttr_Comment)).GetDownstreamVector().GetStringAt(offset) + def.OnUpdate = bat.GetVectorByName((pkgcatalog.SystemColAttr_Update)).GetDownstreamVector().GetBytesAt(offset) + def.Default = bat.GetVectorByName((pkgcatalog.SystemColAttr_DefaultExpr)).GetDownstreamVector().GetBytesAt(offset) def.Idx = int(idxes[offset]) - 1 def.SeqNum = seqNums[offset] - def.EnumValues = string(copyBytes(bat.GetVectorByName((pkgcatalog.SystemColAttr_EnumValues)).Get(offset).([]byte))) + def.EnumValues = bat.GetVectorByName((pkgcatalog.SystemColAttr_EnumValues)).GetDownstreamVector().GetStringAt(offset) s.NameMap[def.Name] = def.Idx s.ColDefs = append(s.ColDefs, def) if def.Name == PhyAddrColumnName { def.PhyAddr = true } - constraint := string(copyBytes(bat.GetVectorByName(pkgcatalog.SystemColAttr_ConstraintType).Get(offset).([]byte))) + constraint := bat.GetVectorByName(pkgcatalog.SystemColAttr_ConstraintType).GetDownstreamVector().GetStringAt(offset) if constraint == "p" { def.SortKey = true def.Primary = true From 6f3fad45d7a72706780d316fd0929c17a379bea8 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Thu, 9 Jan 2025 18:01:57 +0800 Subject: [PATCH 09/10] update --- pkg/vm/engine/tae/catalog/catalogreplay.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index 2b345da1a2cd3..51aa893d58b7c 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -296,13 +296,13 @@ func (catalog *Catalog) ReplayMODatabase(ctx context.Context, txnNode *txnbase.T createAts := vector.MustFixedColNoTypeCheck[types.Timestamp](bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateAt).GetDownstreamVector()) for i := 0; i < bat.Length(); i++ { dbid := dbids[i] - name := string(copyBytes(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Name).Get(i).([]byte))) + name := bat.GetVectorByName(pkgcatalog.SystemDBAttr_Name).GetDownstreamVector().GetStringAt(i) tenantID := tenantIDs[i] userID := userIDs[i] roleID := roleIDs[i] createAt := createAts[i] - createSql := string(copyBytes(bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateSQL).Get(i).([]byte))) - datType := string(copyBytes(bat.GetVectorByName(pkgcatalog.SystemDBAttr_Type).Get(i).([]byte))) + createSql := bat.GetVectorByName(pkgcatalog.SystemDBAttr_CreateSQL).GetDownstreamVector().GetStringAt(i) + datType := bat.GetVectorByName(pkgcatalog.SystemDBAttr_Type).GetDownstreamVector().GetStringAt(i) catalog.onReplayCreateDB(dbid, name, txnNode, tenantID, userID, roleID, createAt, createSql, datType) } } From 47595655094379e54bbe3dcdc56ced8b58099a6a Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Thu, 9 Jan 2025 18:03:42 +0800 Subject: [PATCH 10/10] fix sca problems --- pkg/vm/engine/tae/catalog/catalogreplay.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index 51aa893d58b7c..b85c01f6bfd30 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -344,12 +344,6 @@ func (catalog *Catalog) onReplayCreateDB( db.InsertLocked(un) } -func copyBytes(src []byte) []byte { - ret := make([]byte, len(src)) - copy(ret, src) - return ret -} - func (catalog *Catalog) ReplayMOTables(ctx context.Context, txnNode *txnbase.TxnMVCCNode, dataF DataFactory, tblBat, colBat *containers.Batch, replayer ObjectListReplayer) { tids := vector.MustFixedColNoTypeCheck[uint64](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_ID).GetDownstreamVector()) dbids := vector.MustFixedColNoTypeCheck[uint64](tblBat.GetVectorByName(pkgcatalog.SystemRelAttr_DBID).GetDownstreamVector())