Skip to content

Commit

Permalink
Cherry-pick #17038 and #17107 to main (#17175)
Browse files Browse the repository at this point in the history
Cherry-pick #17038 and #17107 to main

Approved by: @m-schen
  • Loading branch information
zengyan1 authored Jun 27, 2024
1 parent 196434e commit 3c90c50
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 72 deletions.
13 changes: 13 additions & 0 deletions pkg/sql/colexec/receiver_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"reflect"
"time"

"github.com/matrixorigin/matrixone/pkg/common/bitmap"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

Expand Down Expand Up @@ -224,3 +225,15 @@ func (r *ReceiverOperator) selectFromAllReg() (int, *process.RegisterMessage, bo
}
return chosen, msg, ok
}

func (r *ReceiverOperator) ReceiveBitmapFromChannel(ch chan *bitmap.Bitmap) *bitmap.Bitmap {
select {
case <-r.proc.Ctx.Done():
return nil
case bm, ok := <-ch:
if !ok {
return nil
}
return bm
}
}
14 changes: 7 additions & 7 deletions pkg/sql/colexec/right/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,15 @@ func (ctr *container) sendLast(ap *Argument, proc *process.Process, analyze proc
ap.Channel <- ctr.matched
return true, nil
} else {
cnt := 1
for v := range ap.Channel {
ctr.matched.Or(v)
cnt++
if cnt == int(ap.NumCPU) {
close(ap.Channel)
break
for cnt := 1; cnt < int(ap.NumCPU); cnt++ {
v := ctr.ReceiveBitmapFromChannel(ap.Channel)
if v != nil {
ctr.matched.Or(v)
} else {
return true, nil
}
}
close(ap.Channel)
}
}

Expand Down
13 changes: 2 additions & 11 deletions pkg/sql/colexec/right/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,8 @@ func (arg *Argument) Reset(proc *process.Process, pipelineFailed bool, err error
func (arg *Argument) Free(proc *process.Process, pipelineFailed bool, err error) {
ctr := arg.ctr
if ctr != nil {
if !ctr.handledLast {
if arg.NumCPU > 0 {
if arg.IsMerger {
for i := uint64(1); i < arg.NumCPU; i++ {
<-arg.Channel
}
} else {
arg.Channel <- ctr.matched
}
}
ctr.handledLast = true
if !ctr.handledLast && arg.NumCPU > 1 && !arg.IsMerger {
arg.Channel <- nil
}
ctr.cleanBatch(proc)
ctr.cleanHashMap()
Expand Down
18 changes: 9 additions & 9 deletions pkg/sql/colexec/rightanti/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,16 @@ func (ctr *container) sendLast(ap *Argument, proc *process.Process, analyze proc
if !ap.IsMerger {
ap.Channel <- ctr.matched
return true, nil
}

cnt := 1
for v := range ap.Channel {
ctr.matched.Or(v)
cnt++
if cnt == int(ap.NumCPU) {
close(ap.Channel)
break
} else {
for cnt := 1; cnt < int(ap.NumCPU); cnt++ {
v := ctr.ReceiveBitmapFromChannel(ap.Channel)
if v != nil {
ctr.matched.Or(v)
} else {
return true, nil
}
}
close(ap.Channel)
}
}

Expand Down
13 changes: 2 additions & 11 deletions pkg/sql/colexec/rightanti/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,8 @@ func (arg *Argument) Reset(proc *process.Process, pipelineFailed bool, err error
func (arg *Argument) Free(proc *process.Process, pipelineFailed bool, err error) {
ctr := arg.ctr
if ctr != nil {
if !ctr.handledLast {
if arg.NumCPU > 0 {
if arg.IsMerger {
for i := uint64(1); i < arg.NumCPU; i++ {
<-arg.Channel
}
} else {
arg.Channel <- ctr.matched
}
}
ctr.handledLast = true
if !ctr.handledLast && arg.NumCPU > 1 && !arg.IsMerger {
arg.Channel <- nil
}
ctr.cleanBatch(proc)
ctr.cleanEvalVectors()
Expand Down
29 changes: 6 additions & 23 deletions pkg/sql/colexec/rightsemi/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ package rightsemi

import (
"bytes"
"time"

"github.com/matrixorigin/matrixone/pkg/common/bitmap"
"github.com/matrixorigin/matrixone/pkg/common/hashmap"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
Expand Down Expand Up @@ -193,33 +191,18 @@ func (ctr *container) sendLast(ap *Argument, proc *process.Process, analyze proc

if ap.NumCPU > 1 {
if !ap.IsMerger {

sendStart := time.Now()
ap.Channel <- ctr.matched
analyze.WaitStop(sendStart)

return true, nil
} else {
cnt := 1

receiveStart := time.Now()

// The original code didn't handle the context correctly and would cause the system to HUNG!
for completed := true; completed; {
select {
case <-proc.Ctx.Done():
return true, moerr.NewInternalError(proc.Ctx, "query has been closed early")
case v := <-ap.Channel:
for cnt := 1; cnt < int(ap.NumCPU); cnt++ {
v := ctr.ReceiveBitmapFromChannel(ap.Channel)
if v != nil {
ctr.matched.Or(v)
cnt++
if cnt == int(ap.NumCPU) {
close(ap.Channel)
completed = false
}
} else {
return true, nil
}
}
analyze.WaitStop(receiveStart)

close(ap.Channel)
}
}

Expand Down
13 changes: 2 additions & 11 deletions pkg/sql/colexec/rightsemi/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,8 @@ func (arg *Argument) Reset(proc *process.Process, pipelineFailed bool, err error
func (arg *Argument) Free(proc *process.Process, pipelineFailed bool, err error) {
ctr := arg.ctr
if ctr != nil {
if !ctr.handledLast {
if arg.NumCPU > 0 {
if arg.IsMerger {
for i := uint64(1); i < arg.NumCPU; i++ {
<-arg.Channel
}
} else {
arg.Channel <- ctr.matched
}
}
ctr.handledLast = true
if !ctr.handledLast && arg.NumCPU > 1 && !arg.IsMerger {
arg.Channel <- nil
}
ctr.cleanBatch(proc)
ctr.cleanEvalVectors()
Expand Down

0 comments on commit 3c90c50

Please sign in to comment.