Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync to 1.2: fix Size() duplicate counting rows size. #17147

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 7 additions & 34 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package disttae

import (
"bytes"
"context"
"fmt"
"strconv"
Expand Down Expand Up @@ -204,11 +203,6 @@ func (tbl *txnTable) Rows(ctx context.Context) (uint64, error) {
}

func (tbl *txnTable) Size(ctx context.Context, columnName string) (uint64, error) {
var buf bytes.Buffer
defer func() {
logutil.Info("txnTable.Size", zap.String("", buf.String()))
}()

ts := types.TimestampToTS(tbl.db.op.SnapshotTS())
part, err := tbl.getPartitionState(ctx)
if err != nil {
Expand All @@ -231,7 +225,6 @@ func (tbl *txnTable) Size(ctx context.Context, columnName string) (uint64, error
return 0, moerr.NewInvalidInput(ctx, "bad input column name %v", columnName)
}

buf.WriteString("workspace: ")
deletes := make(map[types.Rowid]struct{})
tbl.getTxn().forEachTableWrites(
tbl.db.databaseId,
Expand All @@ -243,7 +236,6 @@ func (tbl *txnTable) Size(ctx context.Context, columnName string) (uint64, error
if _, ok := neededCols[s]; ok {
delta := uint64(entry.bat.Vecs[i].Size())
szInPart += delta
buf.WriteString(fmt.Sprintf("(I)%s-%d; ", s, delta))
}
}
} else {
Expand All @@ -261,45 +253,27 @@ func (tbl *txnTable) Size(ctx context.Context, columnName string) (uint64, error
vs := vector.MustFixedCol[types.Rowid](entry.bat.GetVector(0))
for _, v := range vs {
deletes[v] = struct{}{}
buf.WriteString(fmt.Sprintf("(D)%s; ", v.String()))
}
}
}
})

// Different rows may belong to same batch. So we have
// to record the batch which we have already handled to avoid
// repetitive computation
handled := make(map[*batch.Batch]struct{})
// Calculate the in mem size
// TODO: It might includ some deleted row size
buf.WriteString("\npartition rows: ")

iter := part.NewRowsIter(ts, nil, false)
defer func() { _ = iter.Close() }()
for iter.Next() {
entry := iter.Entry()
if _, ok := deletes[entry.RowID]; ok {
continue
}
if _, ok := handled[entry.Batch]; ok {
continue
}

for i, s := range entry.Batch.Attrs {
if _, ok := neededCols[s]; ok {
delta := uint64(entry.Batch.Vecs[i].Size())
delta := uint64(entry.Batch.Vecs[i].Size() / entry.Batch.Vecs[i].Length())
szInPart += delta
buf.WriteString(fmt.Sprintf("%s-%d; ", s, entry.Batch.Vecs[i].Size()))
}
}
handled[entry.Batch] = struct{}{}
}

//s := e.Stats(ctx, pb.StatsInfoKey{
// DatabaseID: tbl.db.databaseId,
// TableID: tbl.tableId,
//}, true)
buf.WriteString("\nstats: ")
s, _ := tbl.Stats(ctx, true)
if s == nil {
return szInPart, nil
Expand All @@ -309,14 +283,13 @@ func (tbl *txnTable) Size(ctx context.Context, columnName string) (uint64, error
for _, z := range s.SizeMap {
ret += z
}
buf.WriteString(fmt.Sprintf("%d\n", ret))
return ret + szInPart, nil
}
sz, ok := s.SizeMap[columnName]
if !ok {
return 0, moerr.NewInvalidInput(ctx, "bad input column name %v", columnName)
}
buf.WriteString(fmt.Sprintf("%d\n", sz))

return sz + szInPart, nil
}

Expand Down Expand Up @@ -727,10 +700,10 @@ var slowPathCounter atomic.Int64
func (tbl *txnTable) rangesOnePart(
ctx context.Context,
state *logtailreplay.PartitionState, // snapshot state of this transaction
tableDef *plan.TableDef, // table definition (schema)
exprs []*plan.Expr, // filter expression
outBlocks *objectio.BlockInfoSlice, // output marshaled block list after filtering
proc *process.Process, // process of this transaction
tableDef *plan.TableDef, // table definition (schema)
exprs []*plan.Expr, // filter expression
outBlocks *objectio.BlockInfoSlice, // output marshaled block list after filtering
proc *process.Process, // process of this transaction
txnOffset int,
) (err error) {
var done bool
Expand Down
Loading