Skip to content

Commit

Permalink
Merge branch 'main' into fix-connect
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jan 10, 2025
2 parents e6f4e44 + 6301ec7 commit d1c6f70
Show file tree
Hide file tree
Showing 7 changed files with 854 additions and 782 deletions.
1,466 changes: 751 additions & 715 deletions pkg/pb/plan/plan.pb.go

Large diffs are not rendered by default.

37 changes: 10 additions & 27 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ func (c *Compile) compileQuery(qry *plan.Query) ([]*Scope, error) {
v2.TxnStatementCompileQueryHistogram.Observe(time.Since(start).Seconds())
}()

c.execType = plan2.GetExecType(c.pn.GetQuery(), c.getHaveDDL(), c.isPrepare, c.ncpu)
c.execType = plan2.GetExecType(c.pn.GetQuery(), c.getHaveDDL(), c.isPrepare)

n := getEngineNode(c)
if c.execType == plan2.ExecTypeTP || c.execType == plan2.ExecTypeAP_ONECN {
Expand All @@ -830,6 +830,8 @@ func (c *Compile) compileQuery(qry *plan.Query) ([]*Scope, error) {
return nil, cantCompileForPrepareErr
}

plan2.CalcQueryDOP(c.pn, int32(c.ncpu), len(c.cnList), c.execType)

c.initAnalyzeModule(qry)
// deal with sink scan first.
for i := len(qry.Steps) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -2082,7 +2084,7 @@ func (c *Compile) compileMinusAndIntersect(n *plan.Node, left []*Scope, right []
if c.IsSingleScope(left) && c.IsSingleScope(right) {
return c.compileTpMinusAndIntersect(left, right, nodeType)
}
rs := c.newScopeListOnCurrentCN(2, int(n.Stats.BlockNum))
rs := c.newScopeListOnCurrentCN(2, int(n.Stats.Dop))
rs = c.newScopeListForMinusAndIntersect(rs, left, right, n)

c.hasMergeOp = true
Expand Down Expand Up @@ -2953,7 +2955,7 @@ func (c *Compile) compileShuffleGroupV2(n *plan.Node, inputSS []*Scope, nodes []
}

child := nodes[n.Children[0]]
dop := plan2.GetShuffleDop(c.ncpu, len(c.cnList), n.Stats.HashmapStats.HashmapSize)
dop := int(n.Stats.Dop)
if dop != inputSS[0].NodeInfo.Mcpu {
if child.NodeType == plan.Node_TABLE_SCAN {
inputSS[0].NodeInfo.Mcpu = dop
Expand Down Expand Up @@ -3005,7 +3007,7 @@ func (c *Compile) compileShuffleGroup(n *plan.Node, inputSS []*Scope, nodes []*p
}

shuffleGroups := make([]*Scope, 0, len(c.cnList))
dop := plan2.GetShuffleDop(c.ncpu, len(c.cnList), n.Stats.HashmapStats.HashmapSize)
dop := int(n.Stats.Dop)
for _, cn := range c.cnList {
scopes := c.newScopeListWithNode(dop, len(inputSS), cn.Addr)
for _, s := range scopes {
Expand Down Expand Up @@ -3534,9 +3536,8 @@ func (c *Compile) newMergeScope(ss []*Scope) *Scope {
// newScopeListOnCurrentCN traverse the cnList and only generate Scope list for the current CN node
// waing: newScopeListOnCurrentCN result is only used to build Scope and add one merge operator.
// If other operators are added, please let @qingxinhome know
func (c *Compile) newScopeListOnCurrentCN(childrenCount int, blocks int) []*Scope {
func (c *Compile) newScopeListOnCurrentCN(childrenCount int, mcpu int) []*Scope {
node := getEngineNode(c)
mcpu := c.generateCPUNumber(node.Mcpu, blocks)
ss := c.newScopeListWithNode(mcpu, childrenCount, node.Addr)
return ss
}
Expand Down Expand Up @@ -3672,11 +3673,7 @@ func (c *Compile) newShuffleJoinScopeList(probeScopes, buildScopes []*Scope, n *
}
}

dop := c.ncpu //for dedup join, limit the dop max to ncpu
if n.JoinType != plan.Node_DEDUP {
dop = plan2.GetShuffleDop(c.ncpu, len(cnlist), n.Stats.HashmapStats.HashmapSize)
}

dop := int(n.Stats.Dop)
bucketNum := len(cnlist) * dop
shuffleProbes := make([]*Scope, 0, bucketNum)
shuffleBuilds := make([]*Scope, 0, bucketNum)
Expand Down Expand Up @@ -3787,20 +3784,6 @@ func (c *Compile) newShuffleJoinScopeList(probeScopes, buildScopes []*Scope, n *
return shuffleProbes
}

func (c *Compile) generateCPUNumber(cpunum, blocks int) int {
if cpunum <= 0 || blocks <= 16 || c.IsTpQuery() {
return 1
}
ret := blocks/16 + 1
if c.isPrepare {
ret = blocks/64 + 1
}
if ret <= cpunum {
return ret
}
return cpunum
}

func collectTombstones(
c *Compile,
node *plan.Node,
Expand Down Expand Up @@ -4001,13 +3984,13 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, error) {
var nodes engine.Nodes
// scan on current CN
if shouldScanOnCurrentCN(c, n, forceSingle) {
mcpu := c.generateCPUNumber(c.ncpu, int(n.Stats.BlockNum))
mcpu := n.Stats.Dop
if forceSingle {
mcpu = 1
}
nodes = append(nodes, engine.Node{
Addr: c.addr,
Mcpu: mcpu,
Mcpu: int(mcpu),
CNCNT: 1,
})
return nodes, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/plan/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func shuffleByZonemap(rsp *engine.RangesShuffleParam, zm objectio.ZoneMap, bucke
}

var shuffleIDX uint64
if rsp.ShuffleRangeUint64 != nil {
if len(rsp.ShuffleRangeUint64) > 0 {
shuffleIDX = GetRangeShuffleIndexForZMUnsignedSlice(rsp.ShuffleRangeUint64, zm)
} else if rsp.ShuffleRangeInt64 != nil {
} else if len(rsp.ShuffleRangeInt64) > 0 {
shuffleIDX = GetRangeShuffleIndexForZMSignedSlice(rsp.ShuffleRangeInt64, zm)
} else {
shuffleIDX = GetRangeShuffleIndexForZM(rsp.Node.Stats.HashmapStats.ShuffleColMin, rsp.Node.Stats.HashmapStats.ShuffleColMax, zm, uint64(bucketNum))
Expand Down Expand Up @@ -656,7 +656,7 @@ func determineShuffleForGroupBy(n *plan.Node, builder *QueryBuilder) {

}

func GetShuffleDop(ncpu int, lencn int, hashmapSize float64) (dop int) {
func getShuffleDop(ncpu int, lencn int, hashmapSize float64) (dop int) {
if ncpu <= 4 {
ncpu = 4
}
Expand Down
54 changes: 27 additions & 27 deletions pkg/sql/plan/shuffle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,65 +204,65 @@ func TestRangeShuffleSlice(t *testing.T) {
}

func TestGetShuffleDop(t *testing.T) {
n := GetShuffleDop(1, 1, 100)
n := getShuffleDop(1, 1, 100)
require.Equal(t, 4, n)
n = GetShuffleDop(4, 2, 100000000000)
n = getShuffleDop(4, 2, 100000000000)
require.Equal(t, 16, n)
n = GetShuffleDop(64, 1, 10000000000000)
n = getShuffleDop(64, 1, 10000000000000)
require.Equal(t, 64, n)

n = GetShuffleDop(16, 3, 1500000000)
n = getShuffleDop(16, 3, 1500000000)
require.Equal(t, 40, n)
n = GetShuffleDop(16, 3, 150000000)
n = getShuffleDop(16, 3, 150000000)
require.Equal(t, 64, n)
n = GetShuffleDop(16, 3, 15000000)
n = getShuffleDop(16, 3, 15000000)
require.Equal(t, 32, n)
n = GetShuffleDop(16, 3, 1500000)
n = getShuffleDop(16, 3, 1500000)
require.Equal(t, 16, n)

n = GetShuffleDop(16, 4, 1500000000)
n = getShuffleDop(16, 4, 1500000000)
require.Equal(t, 64, n)
n = GetShuffleDop(16, 4, 150000000)
n = getShuffleDop(16, 4, 150000000)
require.Equal(t, 64, n)
n = GetShuffleDop(16, 4, 15000000)
n = getShuffleDop(16, 4, 15000000)
require.Equal(t, 32, n)
n = GetShuffleDop(16, 4, 1500000)
n = getShuffleDop(16, 4, 1500000)
require.Equal(t, 16, n)

n = GetShuffleDop(16, 3, 300000000)
n = getShuffleDop(16, 3, 300000000)
require.Equal(t, 64, n)
n = GetShuffleDop(16, 3, 30000000)
n = getShuffleDop(16, 3, 30000000)
require.Equal(t, 48, n)
n = GetShuffleDop(16, 3, 3000000)
n = getShuffleDop(16, 3, 3000000)
require.Equal(t, 16, n)
n = GetShuffleDop(16, 3, 300000)
n = getShuffleDop(16, 3, 300000)
require.Equal(t, 16, n)

n = GetShuffleDop(16, 4, 300000000)
n = getShuffleDop(16, 4, 300000000)
require.Equal(t, 64, n)
n = GetShuffleDop(16, 4, 30000000)
n = getShuffleDop(16, 4, 30000000)
require.Equal(t, 32, n)
n = GetShuffleDop(16, 4, 3000000)
n = getShuffleDop(16, 4, 3000000)
require.Equal(t, 16, n)
n = GetShuffleDop(16, 4, 300000)
n = getShuffleDop(16, 4, 300000)
require.Equal(t, 16, n)

n = GetShuffleDop(64, 1, 1500000000)
n = getShuffleDop(64, 1, 1500000000)
require.Equal(t, 64, n)
n = GetShuffleDop(64, 1, 150000000)
n = getShuffleDop(64, 1, 150000000)
require.Equal(t, 64, n)
n = GetShuffleDop(64, 1, 15000000)
n = getShuffleDop(64, 1, 15000000)
require.Equal(t, 64, n)
n = GetShuffleDop(64, 1, 1500000)
n = getShuffleDop(64, 1, 1500000)
require.Equal(t, 64, n)

n = GetShuffleDop(16, 1, 1500000000)
n = getShuffleDop(16, 1, 1500000000)
require.Equal(t, 64, n)
n = GetShuffleDop(16, 1, 150000000)
n = getShuffleDop(16, 1, 150000000)
require.Equal(t, 64, n)
n = GetShuffleDop(16, 1, 15000000)
n = getShuffleDop(16, 1, 15000000)
require.Equal(t, 48, n)
n = GetShuffleDop(16, 1, 1500000)
n = getShuffleDop(16, 1, 1500000)
require.Equal(t, 16, n)
}

Expand Down
50 changes: 47 additions & 3 deletions pkg/sql/plan/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,7 +1573,51 @@ func HasShuffleInPlan(qry *plan.Query) bool {
return false
}

func GetExecType(qry *plan.Query, txnHaveDDL bool, isPrepare bool, ncpu int) ExecType {
func calcDOP(ncpu, blocks int32, isPrepare bool) int32 {
if ncpu <= 0 || blocks <= 16 {
return 1
}
ret := blocks/16 + 1
if isPrepare {
ret = blocks/64 + 1
}
if ret <= ncpu {
return ret
}
return ncpu
}

func CalcNodeDOP(p *plan.Plan, rootID int32, ncpu int32, lencn int) {
qry := p.GetQuery()
node := qry.Nodes[rootID]
for i := range node.Children {
CalcNodeDOP(p, node.Children[i], ncpu, lencn)
}
if node.Stats.HashmapStats != nil && node.Stats.HashmapStats.Shuffle && node.NodeType != plan.Node_TABLE_SCAN {
if node.NodeType == plan.Node_JOIN && node.JoinType == plan.Node_DEDUP {
node.Stats.Dop = ncpu
} else {
node.Stats.Dop = int32(getShuffleDop(int(ncpu), lencn, node.Stats.HashmapStats.HashmapSize))
}
} else {
node.Stats.Dop = calcDOP(ncpu, node.Stats.BlockNum, p.IsPrepare)
}
}

func CalcQueryDOP(p *plan.Plan, ncpu int32, lencn int, typ ExecType) {
qry := p.GetQuery()
if typ == ExecTypeTP {
for i := range qry.Nodes {
qry.Nodes[i].Stats.Dop = 1
}
return
}
for i := range qry.Steps {
CalcNodeDOP(p, qry.Steps[i], ncpu, lencn)
}
}

func GetExecType(qry *plan.Query, txnHaveDDL bool, isPrepare bool) ExecType {
if GetForceScanOnMultiCN() {
return ExecTypeAP_MULTICN
}
Expand Down Expand Up @@ -1609,7 +1653,7 @@ func GetExecType(qry *plan.Query, txnHaveDDL bool, isPrepare bool, ncpu int) Exe

func GetPlanTitle(qry *plan.Query, txnHaveDDL bool) string {
ncpu := system.GoMaxProcs()
switch GetExecType(qry, txnHaveDDL, false, ncpu) {
switch GetExecType(qry, txnHaveDDL, false) {
case ExecTypeTP:
return "TP QUERY PLAN"
case ExecTypeAP_ONECN:
Expand All @@ -1622,7 +1666,7 @@ func GetPlanTitle(qry *plan.Query, txnHaveDDL bool) string {

func GetPhyPlanTitle(qry *plan.Query, txnHaveDDL bool) string {
ncpu := system.GoMaxProcs()
switch GetExecType(qry, txnHaveDDL, false, ncpu) {
switch GetExecType(qry, txnHaveDDL, false) {
case ExecTypeTP:
return "TP QUERY PHYPLAN"
case ExecTypeAP_ONECN:
Expand Down
17 changes: 13 additions & 4 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7609,7 +7609,7 @@ func TestCkpLeak(t *testing.T) {
if db.Runtime.Scheduler.GetPenddingLSNCnt() != 0 {
return
}
checkLeak := func() {
checkLeak := func() bool {
ckpMetaFiles := db.BGCheckpointRunner.GetCheckpointMetaFiles()
var cpt *ioutil.TSRangeFile
for ckpMetaFile := range ckpMetaFiles {
Expand All @@ -7618,19 +7618,24 @@ func TestCkpLeak(t *testing.T) {
logutil.Infof("compact file %v", file.GetName())
if cpt != nil {
logutil.Errorf("dup compacted files %v %v", cpt.GetName(), file.GetName())
assert.Fail(t, "dup compacted files")
return false
}
cpt = &file
}
}
return true
}
testutils.WaitExpect(5000, func() bool {
return db.DiskCleaner.GetCleaner().GetMinMerged() != nil
})
if db.DiskCleaner.GetCleaner().GetMinMerged() == nil {
return
}
checkLeak()
testutils.WaitExpect(5000, func() bool {
return checkLeak()
})
ok := checkLeak()
assert.True(t, ok)
tae.Restart(ctx)
assert.Equal(t, uint64(0), db.Runtime.Scheduler.GetPenddingLSNCnt())
testutils.WaitExpect(5000, func() bool {
Expand All @@ -7639,7 +7644,11 @@ func TestCkpLeak(t *testing.T) {
if db.DiskCleaner.GetCleaner().GetMinMerged() == nil {
return
}
checkLeak()
testutils.WaitExpect(5000, func() bool {
return checkLeak()
})
ok = checkLeak()
assert.True(t, ok)

}

Expand Down
6 changes: 3 additions & 3 deletions proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,9 @@ message Stats {
//for scan, selectivity means outcnt divide total count
double selectivity = 6;
bool forceOneCN = 7;

HashMapStats hashmapStats = 8;
string sql = 9;
int32 dop = 8;
HashMapStats hashmapStats = 9;
string sql = 10;
}

message RowsetExpr {
Expand Down

0 comments on commit d1c6f70

Please sign in to comment.