Skip to content

Commit

Permalink
Resolve huang and fix some memory issues (#8076)
Browse files Browse the repository at this point in the history
* add compile broadcast join

Signed-off-by: Wu Qinxuan <[email protected]>

* add missing dup for batch

Signed-off-by: Wu Qinxuan <[email protected]>

* tmp saving for testing

Signed-off-by: Wu Qinxuan <[email protected]>

* fix done channel issue

Signed-off-by: Wu Qinxuan <[email protected]>

* adjust stats

* tmp saving

Signed-off-by: Wu Qinxuan <[email protected]>

* fix uuid to channel mapping issue

Signed-off-by: Wu Qinxuan <[email protected]>

* fix a bug will cause errCh deadlock

* fix mpool issue and channel register issue

Signed-off-by: Wu Qinxuan <[email protected]>

* fix channel regist issue

Signed-off-by: Wu Qinxuan <[email protected]>

* fix missing remove bug

Signed-off-by: Wu Qinxuan <[email protected]>

* fix checksum issue

Signed-off-by: Wu Qinxuan <[email protected]>

* remove remote addr check and some debug log

Signed-off-by: Wu Qinxuan <[email protected]>

* Update join_order.go

* Update join_order.go

* reduce memory

* imporve memory

* Fix some memory use bug

* Fix confict

* Fix sca

* Fix a slip of the pen

---------

Signed-off-by: Wu Qinxuan <[email protected]>
Co-authored-by: Wu Qinxuan <[email protected]>
Co-authored-by: badboynt1 <[email protected]>
Co-authored-by: chenmingsong <[email protected]>
Co-authored-by: maomao <[email protected]>
  • Loading branch information
5 people authored Feb 18, 2023
1 parent 621c631 commit 6d4bd17
Show file tree
Hide file tree
Showing 21 changed files with 444 additions and 348 deletions.
4 changes: 2 additions & 2 deletions pkg/common/hashmap/joinmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/plan"
)

func NewJoinMap(sels [][]int64, expr *plan.Expr, mp *StrHashMap, hasNull bool, idx *index.LowCardinalityIndex) *JoinMap {
func NewJoinMap(sels [][]int32, expr *plan.Expr, mp *StrHashMap, hasNull bool, idx *index.LowCardinalityIndex) *JoinMap {
cnt := int64(1)
return &JoinMap{
cnt: &cnt,
Expand All @@ -33,7 +33,7 @@ func NewJoinMap(sels [][]int64, expr *plan.Expr, mp *StrHashMap, hasNull bool, i
}
}

func (jm *JoinMap) Sels() [][]int64 {
func (jm *JoinMap) Sels() [][]int32 {
return jm.sels
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/common/hashmap/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Iterator interface {
type JoinMap struct {
cnt *int64
dupCnt *int64
sels [][]int64
sels [][]int32
// push-down filter expression, possibly a bloomfilter
expr *plan.Expr
mp *StrHashMap
Expand Down
41 changes: 32 additions & 9 deletions pkg/sql/colexec/dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"bytes"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

Expand All @@ -28,26 +30,32 @@ func String(arg any, buf *bytes.Buffer) {
func Prepare(proc *process.Process, arg any) error {
ap := arg.(*Argument)
ap.ctr = new(container)
ap.prepared = false
if len(ap.RemoteRegs) == 0 {
ap.prepared = true
ap.ctr.remoteReceivers = nil
} else {
ap.ctr.remoteReceivers = make([]*WrapperClientSession, 0, len(ap.RemoteRegs))
}

switch ap.FuncId {
case SendToAllFunc:
if len(ap.RemoteRegs) == 0 {
return moerr.NewInternalError(proc.Ctx, "SendToAllFunc should include RemoteRegs")
}
ap.prepared = false
ap.ctr.remoteReceivers = make([]*WrapperClientSession, 0, len(ap.RemoteRegs))
ap.ctr.sendFunc = sendToAllFunc
for _, rr := range ap.RemoteRegs {
colexec.Srv.PutNotifyChIntoUuidMap(rr.Uuid, proc.DispatchNotifyCh)
}

case SendToAllLocalFunc:
if !ap.prepared {
if len(ap.RemoteRegs) != 0 {
return moerr.NewInternalError(proc.Ctx, "SendToAllLocalFunc should not send to remote")
}
ap.prepared = true
ap.ctr.remoteReceivers = nil
ap.ctr.sendFunc = sendToAllLocalFunc
case SendToAnyLocalFunc:
if !ap.prepared {
if len(ap.RemoteRegs) != 0 {
return moerr.NewInternalError(proc.Ctx, "SendToAnyLocalFunc should not send to remote")
}
ap.prepared = true
ap.ctr.remoteReceivers = nil
ap.ctr.sendFunc = sendToAnyLocalFunc
default:
return moerr.NewInternalError(proc.Ctx, "wrong sendFunc id for dispatch")
Expand All @@ -67,6 +75,21 @@ func Call(idx int, proc *process.Process, arg any, isFirst bool, isLast bool) (b
return true, nil
}

if bat.Length() == 0 {
return false, nil
}

for i, vec := range bat.Vecs {
if vec.IsOriginal() {
cloneVec, err := vector.Dup(vec, proc.Mp())
if err != nil {
bat.Clean(proc.Mp())
return false, err
}
bat.Vecs[i] = cloneVec
}
}

if err := ap.ctr.sendFunc(bat, ap, proc); err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func newTestCase(all bool) dispatchTestCase {
{Oid: types.T_int8},
},
arg: &Argument{
FuncId: SendToAllFunc,
FuncId: SendToAllLocalFunc,
LocalRegs: []*process.WaitRegister{reg},
},
cancel: cancel,
Expand Down
67 changes: 25 additions & 42 deletions pkg/sql/colexec/dispatch/sendfunc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package dispatch

import (
"context"
"hash/crc32"
"sync/atomic"
"time"

Expand Down Expand Up @@ -60,7 +61,7 @@ func sendToAllLocalFunc(bat *batch.Batch, ap *Argument, proc *process.Process) e
// common sender: send to any LocalReceiver
func sendToAnyLocalFunc(bat *batch.Batch, ap *Argument, proc *process.Process) error {
// send to local receiver
sendto := ap.sendto % len(ap.LocalRegs)
sendto := ap.sendCnt % len(ap.LocalRegs)
reg := ap.LocalRegs[sendto]
select {
case <-reg.Ctx.Done():
Expand All @@ -74,59 +75,27 @@ func sendToAnyLocalFunc(bat *batch.Batch, ap *Argument, proc *process.Process) e
ap.LocalRegs = append(ap.LocalRegs[:sendto], ap.LocalRegs[sendto+1:]...)
return nil
case reg.Ch <- bat:
ap.sendto++
ap.sendCnt++
}

return nil
}

// common sender: send to all receiver
func sendToAllFunc(bat *batch.Batch, ap *Argument, proc *process.Process) error {
refCountAdd := int64(len(ap.LocalRegs) - 1)
atomic.AddInt64(&bat.Cnt, refCountAdd)
if jm, ok := bat.Ht.(*hashmap.JoinMap); ok {
jm.IncRef(refCountAdd)
jm.SetDupCount(int64(len(ap.LocalRegs)))
}

// if the remote receiver is not prepared, send to LocalRegs first
// and then waiting the remote receiver notify
if !ap.prepared {
for _, reg := range ap.LocalRegs {
select {
case <-reg.Ctx.Done():
return moerr.NewInternalError(proc.Ctx, "pipeline context has done.")
case reg.Ch <- bat:
}
}

// wait the remote notify
cnt := len(ap.RemoteRegs)
encodeData, errEncode := types.Encode(bat)
if errEncode != nil {
return errEncode
}
for cnt > 0 {
csinfo := <-proc.DispatchNotifyCh
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*10000)
_ = cancel

newWrapClientSession := &WrapperClientSession{
msgId: csinfo.MsgId,
ctx: timeoutCtx,
cs: csinfo.Cs,
uuid: csinfo.Uid,
}
// TODO: add check the receive info's correctness
if err := sendBatchToClientSession(encodeData, newWrapClientSession); err != nil {
return err
}
ap.ctr.remoteReceivers = append(ap.ctr.remoteReceivers, newWrapClientSession)
ap.ctr.remoteReceivers = append(ap.ctr.remoteReceivers, &WrapperClientSession{
msgId: csinfo.MsgId,
cs: csinfo.Cs,
uuid: csinfo.Uid,
doneCh: csinfo.DoneCh,
})
cnt--
}
ap.prepared = true

return nil
}

if ap.ctr.remoteReceivers != nil {
Expand All @@ -141,6 +110,13 @@ func sendToAllFunc(bat *batch.Batch, ap *Argument, proc *process.Process) error
}
}

refCountAdd := int64(len(ap.LocalRegs) - 1)
atomic.AddInt64(&bat.Cnt, refCountAdd)
if jm, ok := bat.Ht.(*hashmap.JoinMap); ok {
jm.IncRef(refCountAdd)
jm.SetDupCount(int64(len(ap.LocalRegs)))
}

for _, reg := range ap.LocalRegs {
select {
case <-reg.Ctx.Done():
Expand All @@ -153,15 +129,19 @@ func sendToAllFunc(bat *batch.Batch, ap *Argument, proc *process.Process) error
}

func sendBatchToClientSession(encodeBatData []byte, wcs *WrapperClientSession) error {
checksum := crc32.ChecksumIEEE(encodeBatData)
if len(encodeBatData) <= maxMessageSizeToMoRpc {
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*10000)
_ = cancel
msg := cnclient.AcquireMessage()
{
msg.Id = wcs.msgId
msg.Data = encodeBatData
msg.Cmd = pipeline.BatchMessage
msg.Sid = pipeline.BatchEnd
msg.Checksum = checksum
}
if err := wcs.cs.Write(wcs.ctx, msg); err != nil {
if err := wcs.cs.Write(timeoutCtx, msg); err != nil {
return err
}
return nil
Expand All @@ -175,15 +155,18 @@ func sendBatchToClientSession(encodeBatData []byte, wcs *WrapperClientSession) e
end = len(encodeBatData)
sid = pipeline.BatchEnd
}
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*10000)
_ = cancel
msg := cnclient.AcquireMessage()
{
msg.Id = wcs.msgId
msg.Data = encodeBatData[start:end]
msg.Cmd = pipeline.BatchMessage
msg.Sid = uint64(sid)
msg.Checksum = checksum
}

if err := wcs.cs.Write(wcs.ctx, msg); err != nil {
if err := wcs.cs.Write(timeoutCtx, msg); err != nil {
return err
}
start = end
Expand Down
17 changes: 13 additions & 4 deletions pkg/sql/colexec/dispatch/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package dispatch

import (
"context"
"time"

"github.com/google/uuid"
"github.com/matrixorigin/matrixone/pkg/cnservice/cnclient"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/pb/pipeline"
Expand All @@ -28,10 +30,10 @@ import (

type WrapperClientSession struct {
msgId uint64
ctx context.Context
cs morpc.ClientSession
uuid uuid.UUID
// toAddr string
doneCh chan struct{}
}
type container struct {
// the clientsession info for the channel you want to dispatch
Expand All @@ -43,7 +45,7 @@ type container struct {
type Argument struct {
ctr *container
prepared bool
sendto int
sendCnt int

// FuncId means the sendFunc
FuncId int
Expand All @@ -55,16 +57,22 @@ type Argument struct {

func (arg *Argument) Free(proc *process.Process, pipelineFailed bool) {
if arg.ctr.remoteReceivers != nil {
// TODO: how to handle pipelineFailed?
for _, r := range arg.ctr.remoteReceivers {
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*10000)
_ = cancel
message := cnclient.AcquireMessage()
{
message.Id = r.msgId
message.Cmd = pipeline.BatchMessage
message.Sid = pipeline.MessageEnd
message.Uuid = r.uuid[:]
}
r.cs.Write(r.ctx, message)
if pipelineFailed {
err := moerr.NewInternalErrorNoCtx("pipeline failed")
message.Err = pipeline.EncodedMessageError(timeoutCtx, err)
}
r.cs.Write(timeoutCtx, message)
close(r.doneCh)
}

}
Expand All @@ -88,4 +96,5 @@ func (arg *Argument) Free(proc *process.Process, pipelineFailed bool) {
}
close(arg.LocalRegs[i].Ch)
}

}
10 changes: 5 additions & 5 deletions pkg/sql/colexec/hashbuild/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ func (ctr *container) build(ap *Argument, proc *process.Process, anal process.An
continue
}
if v > rows {
ctr.sels = append(ctr.sels, make([]int64, 0, 8))
ctr.sels = append(ctr.sels, make([]int32, 0))
}
ai := int64(v) - 1
ctr.sels[ai] = append(ctr.sels[ai], int64(i+k))
ctr.sels[ai] = append(ctr.sels[ai], int32(i+k))
}
}
return nil
Expand All @@ -155,17 +155,17 @@ func (ctr *container) indexBuild() error {
// => dictionary = ["a"->1, "b"->2, "c"->3]
// => poses = [1, 2, 1, 3, 2, 3, 1, 1]
// sels = [[0, 2, 6, 7], [1, 4], [3, 5]]
ctr.sels = make([][]int64, index.MaxLowCardinality)
ctr.sels = make([][]int32, index.MaxLowCardinality)
poses := vector.MustTCols[uint16](ctr.idx.GetPoses())
for k, v := range poses {
if v == 0 {
continue
}
bucket := int(v) - 1
if len(ctr.sels[bucket]) == 0 {
ctr.sels[bucket] = make([]int64, 0, 64)
ctr.sels[bucket] = make([]int32, 0, 64)
}
ctr.sels[bucket] = append(ctr.sels[bucket], int64(k))
ctr.sels[bucket] = append(ctr.sels[bucket], int32(k))
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/hashbuild/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ func TestLowCardinalityBuild(t *testing.T) {
require.NotNil(t, mp.Index())

sels := mp.Sels()
require.Equal(t, []int64{0, 2, 6, 7}, sels[0])
require.Equal(t, []int64{1, 4}, sels[1])
require.Equal(t, []int64{3, 5}, sels[2])
require.Equal(t, []int32{0, 2, 6, 7}, sels[0])
require.Equal(t, []int32{1, 4}, sels[1])
require.Equal(t, []int32{3, 5}, sels[2])

mp.Free()
tc.proc.Reg.InputBatch.Clean(tc.proc.Mp())
Expand Down
15 changes: 8 additions & 7 deletions pkg/sql/colexec/hashbuild/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type container struct {

hasNull bool

sels [][]int64
sels [][]int32

bat *batch.Batch

Expand All @@ -55,12 +55,13 @@ type container struct {
type Argument struct {
ctr *container
// need to generate a push-down filter expression
NeedExpr bool
NeedHashMap bool
Ibucket uint64
Nbucket uint64
Typs []types.Type
Conditions []*plan.Expr
NeedExpr bool
NeedHashMap bool
NeedSelectList bool
Ibucket uint64
Nbucket uint64
Typs []types.Type
Conditions []*plan.Expr
}

func (arg *Argument) Free(proc *process.Process, pipelineFailed bool) {
Expand Down
Loading

0 comments on commit 6d4bd17

Please sign in to comment.