Skip to content

Commit

Permalink
adjust dop of shuffle plan for future refactoring (#21180)
Browse files Browse the repository at this point in the history
adjust dop of shuffle plan for future refactoring

Approved by: @heni02, @ouyuanning
  • Loading branch information
badboynt1 authored Jan 11, 2025
1 parent 1c3a6e1 commit abe8c32
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 18 deletions.
12 changes: 1 addition & 11 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -2954,17 +2954,7 @@ func (c *Compile) compileShuffleGroupV2(n *plan.Node, inputSS []*Scope, nodes []
return inputSS
}

child := nodes[n.Children[0]]
dop := int(n.Stats.Dop)
if dop != inputSS[0].NodeInfo.Mcpu {
if child.NodeType == plan.Node_TABLE_SCAN {
inputSS[0].NodeInfo.Mcpu = dop
} else {
dop = inputSS[0].NodeInfo.Mcpu
}
}

shuffleArg := constructShuffleArgForGroupV2(n, int32(dop))
shuffleArg := constructShuffleArgForGroupV2(n, n.Stats.Dop)
shuffleArg.SetAnalyzeControl(c.anal.curNodeIdx, false)
inputSS[0].setRootOperator(shuffleArg)

Expand Down
20 changes: 18 additions & 2 deletions pkg/sql/plan/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -1587,6 +1587,16 @@ func calcDOP(ncpu, blocks int32, isPrepare bool) int32 {
return ncpu
}

// set node dop and left child recursively
func setNodeDOP(p *plan.Plan, rootID int32, dop int32) {
qry := p.GetQuery()
node := qry.Nodes[rootID]
if len(node.Children) > 0 {
setNodeDOP(p, node.Children[0], dop)
}
node.Stats.Dop = dop
}

func CalcNodeDOP(p *plan.Plan, rootID int32, ncpu int32, lencn int) {
qry := p.GetQuery()
node := qry.Nodes[rootID]
Expand All @@ -1595,9 +1605,15 @@ func CalcNodeDOP(p *plan.Plan, rootID int32, ncpu int32, lencn int) {
}
if node.Stats.HashmapStats != nil && node.Stats.HashmapStats.Shuffle && node.NodeType != plan.Node_TABLE_SCAN {
if node.NodeType == plan.Node_JOIN && node.JoinType == plan.Node_DEDUP {
node.Stats.Dop = ncpu
setNodeDOP(p, rootID, ncpu)
} else {
node.Stats.Dop = int32(getShuffleDop(int(ncpu), lencn, node.Stats.HashmapStats.HashmapSize))
dop := int32(getShuffleDop(int(ncpu), lencn, node.Stats.HashmapStats.HashmapSize))
childDop := qry.Nodes[node.Children[0]].Stats.Dop
if dop > childDop {
setNodeDOP(p, rootID, dop)
} else {
node.Stats.Dop = childDop
}
}
} else {
node.Stats.Dop = calcDOP(ncpu, node.Stats.BlockNum, p.IsPrepare)
Expand Down
29 changes: 26 additions & 3 deletions test/distributed/cases/optimizer/shuffle.result
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ mo_ctl(dn, flush, d1.t2)
select Sleep(1);
sleep(1)
0
prepare s from select * from t1 where 1=0;
execute s;
c1 c2 c3
explain select count(*) from t1,t2 where t1.c1=t2.c1;
AP QUERY PLAN ON MULTICN(4 core)
Project
Expand Down Expand Up @@ -296,4 +293,30 @@ select count(*) from t1 where c3!=0;
count(*)
4000002
drop table t3;
create table t4(c1 int not null, c2 int unsigned) cluster by c1;
insert into t4 select *,* from generate_series(1000000) g;
insert into t4 select result+10000000,result+10000000 from generate_series(1000000) g;
select mo_ctl('dn', 'flush', 'd1.t4');
mo_ctl(dn, flush, d1.t4)
{\n "method": "Flush",\n "result": [\n {\n "returnStr": "OK"\n }\n ]\n}\n
explain select count(*) as cnt from t4 group by c1 having cnt>1;
AP QUERY PLAN ON MULTICN(4 core)
Project
-> Aggregate
Group Key: t4.c1 shuffle: range(t4.c1)
Aggregate Functions: starcount(1)
Filter Cond: (count(*) > 1)
-> Table Scan on d1.t4
select count(*) as cnt from t4 group by c1 having cnt>1;
cnt
explain select count(*) as cnt from t4 group by c2 having cnt>1;
AP QUERY PLAN ON MULTICN(4 core)
Project
-> Aggregate
Group Key: t4.c2 shuffle: range(t4.c2)
Aggregate Functions: starcount(1)
Filter Cond: (count(*) > 1)
-> Table Scan on d1.t4
select count(*) as cnt from t4 group by c2 having cnt>1;
cnt
drop database if exists d1;
13 changes: 11 additions & 2 deletions test/distributed/cases/optimizer/shuffle.test
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ select mo_ctl('dn', 'flush', 'd1.t1');
-- @separator:table
select mo_ctl('dn', 'flush', 'd1.t2');
select Sleep(1);
prepare s from select * from t1 where 1=0;
execute s;
-- @separator:table
explain select count(*) from t1,t2 where t1.c1=t2.c1;
select count(*) from t1,t2 where t1.c1=t2.c1;
Expand Down Expand Up @@ -94,4 +92,15 @@ insert into t1 values(-1,-2,-3);
insert into t1 values(10,11,12);
select count(*) from t1 where c3!=0;
drop table t3;
create table t4(c1 int not null, c2 int unsigned) cluster by c1;
insert into t4 select *,* from generate_series(1000000) g;
insert into t4 select result+10000000,result+10000000 from generate_series(1000000) g;
-- @separator:table
select mo_ctl('dn', 'flush', 'd1.t4');
-- @separator:table
explain select count(*) as cnt from t4 group by c1 having cnt>1;
select count(*) as cnt from t4 group by c1 having cnt>1;
-- @separator:table
explain select count(*) as cnt from t4 group by c2 having cnt>1;
select count(*) as cnt from t4 group by c2 having cnt>1;
drop database if exists d1;

0 comments on commit abe8c32

Please sign in to comment.