Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: fix memory leak in simpleProjectOp #138804

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ go_library(
"//pkg/util", # keep
"//pkg/util/duration", # keep
"//pkg/util/json", # keep
"//pkg/util/log",
"//pkg/util/log", # keep
"//pkg/util/timeutil/pgdate", # keep
"//pkg/util/uuid", # keep
"@com_github_cockroachdb_apd_v3//:apd", # keep
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecbase/cast.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecbase/cast_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var (
_ = pgcode.Syntax
_ = pgdate.ParseTimestamp
_ = pgerror.Wrapf
_ = log.ExpensiveLogEnabled
)

// {{/*
Expand Down
40 changes: 18 additions & 22 deletions pkg/sql/colexec/colexecbase/simple_project.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,16 @@ import (

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// simpleProjectOp is an operator that implements "simple projection" - removal of
// columns that aren't needed by later operators.
type simpleProjectOp struct {
colexecop.NonExplainable
batches map[coldata.Batch]*projectingBatch
colexecop.OneInputInitCloserHelper

projection []uint32
// numBatchesLoggingThreshold is the threshold on the number of items in
// 'batches' map at which we will log a message when a new projectingBatch
// is created. It is growing exponentially.
numBatchesLoggingThreshold int
batch *projectingBatch
}

var _ colexecop.ClosableOperator = &simpleProjectOp{}
Expand Down Expand Up @@ -105,10 +101,8 @@ func NewSimpleProjectOp(
}
}
s := &simpleProjectOp{
OneInputInitCloserHelper: colexecop.MakeOneInputInitCloserHelper(input),
projection: make([]uint32, len(projection)),
batches: make(map[coldata.Batch]*projectingBatch),
numBatchesLoggingThreshold: 128,
OneInputInitCloserHelper: colexecop.MakeOneInputInitCloserHelper(input),
projection: make([]uint32, len(projection)),
}
// We make a copy of projection to be safe.
copy(s.projection, projection)
Expand All @@ -120,19 +114,21 @@ func (d *simpleProjectOp) Next() coldata.Batch {
if batch.Length() == 0 {
return coldata.ZeroBatch
}
projBatch, found := d.batches[batch]
if !found {
projBatch = newProjectionBatch(d.projection)
d.batches[batch] = projBatch
if len(d.batches) == d.numBatchesLoggingThreshold {
if log.V(1) {
log.Infof(d.Ctx, "simpleProjectOp: size of 'batches' map = %d", len(d.batches))
}
d.numBatchesLoggingThreshold = d.numBatchesLoggingThreshold * 2
}
if d.batch == nil || d.batch.Batch != batch {
// Create a fresh projection batch if we haven't created one already or
// if we see a different "internally" batch coming from the input. The
// latter case can happen during dynamically growing the size of the
// batch in the input, and we need to create a fresh projection batch
// since the last one might have been modified higher up in the tree
// (e.g. a vector could have been appended).
//
// The contract of Operator.Next encourages implementations to reuse the
// same batch, so we shouldn't be hitting this case often to make this
// allocation have non-trivial impact.
d.batch = newProjectionBatch(d.projection)
}
projBatch.Batch = batch
return projBatch
d.batch.Batch = batch
return d.batch
}

func (d *simpleProjectOp) Reset(ctx context.Context) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecbase/simple_project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestSimpleProjectOpWithUnorderedSynchronizer(t *testing.T) {
for i := range parallelUnorderedSynchronizerInputs {
parallelUnorderedSynchronizerInputs[i].Root = inputs[i]
}
input = colexec.NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testMemAcc, parallelUnorderedSynchronizerInputs, &wg)
input = colexec.NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testAllocator, inputTypes, parallelUnorderedSynchronizerInputs, &wg)
input = colexecbase.NewSimpleProjectOp(input, len(inputTypes), []uint32{0})
return colexecbase.NewConstOp(testAllocator, input, types.Int, constVal, 1)
})
Expand Down
22 changes: 19 additions & 3 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -96,6 +98,13 @@ type ParallelUnorderedSynchronizer struct {
// the corresponding to it input.
nextBatch []func()

// outputBatch is the output batch emitted by the synchronizer.
//
// The contract of Operator.Next encourages emitting the same batch across
// Next calls, so batches coming from the inputs are decomposed into vectors
// which are inserted into this batch.
outputBatch coldata.Batch

state int32
// externalWaitGroup refers to the WaitGroup passed in externally. Since the
// ParallelUnorderedSynchronizer spawns goroutines, this allows callers to
Expand Down Expand Up @@ -159,7 +168,8 @@ func hasParallelUnorderedSync(op execopnode.OpNode) bool {
func NewParallelUnorderedSynchronizer(
flowCtx *execinfra.FlowCtx,
processorID int32,
streamingMemAcc *mon.BoundAccount,
allocator *colmem.Allocator,
inputTypes []*types.T,
inputs []colexecargs.OpWithMetaInfo,
wg *sync.WaitGroup,
) *ParallelUnorderedSynchronizer {
Expand Down Expand Up @@ -206,13 +216,14 @@ func NewParallelUnorderedSynchronizer(
return &ParallelUnorderedSynchronizer{
flowCtx: flowCtx,
processorID: processorID,
streamingMemAcc: streamingMemAcc,
streamingMemAcc: allocator.Acc(),
inputs: inputs,
cancelInputsOnDrain: cancelInputs,
tracingSpans: make([]*tracing.Span, len(inputs)),
readNextBatch: readNextBatch,
batches: make([]coldata.Batch, len(inputs)),
nextBatch: make([]func(), len(inputs)),
outputBatch: allocator.NewMemBatchNoCols(inputTypes, coldata.BatchSize() /* capacity */),
externalWaitGroup: wg,
internalWaitGroup: &sync.WaitGroup{},
// batchCh is a buffered channel in order to offer non-blocking writes to
Expand Down Expand Up @@ -441,7 +452,11 @@ func (s *ParallelUnorderedSynchronizer) Next() coldata.Batch {
s.bufferedMeta = append(s.bufferedMeta, msg.meta...)
continue
}
return msg.b
for i, vec := range msg.b.ColVecs() {
s.outputBatch.ReplaceCol(vec, i)
}
colexecutils.UpdateBatchState(s.outputBatch, msg.b.Length(), msg.b.Selection() != nil /* usesSel */, msg.b.Selection())
return s.outputBatch
}
}
}
Expand Down Expand Up @@ -565,6 +580,7 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada

// Done.
s.setState(parallelUnorderedSynchronizerStateDone)
s.outputBatch = nil
bufferedMeta := s.bufferedMeta
// Eagerly lose the reference to the metadata since it might be of
// non-trivial footprint.
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/colexec/parallel_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestParallelUnorderedSynchronizer(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())

var wg sync.WaitGroup
s := NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testMemAcc, inputs, &wg)
s := NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testAllocator, typs, inputs, &wg)
s.Init(ctx)

t.Run(fmt.Sprintf("numInputs=%d/numBatches=%d/terminationScenario=%d", numInputs, numBatches, terminationScenario), func(t *testing.T) {
Expand Down Expand Up @@ -193,6 +193,7 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
// This code is unreachable, but the compiler cannot infer that.
return nil
}}
typs := types.OneIntCol
for i := 1; i < len(inputs); i++ {
acc := testMemMonitor.MakeBoundAccount()
defer acc.Close(ctx)
Expand All @@ -201,7 +202,7 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
NextCb: func() coldata.Batch {
// All inputs that do not encounter an error will continue to return
// batches.
b := allocator.NewMemBatchWithMaxCapacity([]*types.T{types.Int})
b := allocator.NewMemBatchWithMaxCapacity(typs)
b.SetLength(1)
return b
},
Expand All @@ -224,7 +225,7 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
}

var wg sync.WaitGroup
s := NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testMemAcc, inputs, &wg)
s := NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testAllocator, typs, inputs, &wg)
s.Init(ctx)
for {
if err := colexecerror.CatchVectorizedRuntimeError(func() { _ = s.Next() }); err != nil {
Expand Down Expand Up @@ -256,7 +257,7 @@ func BenchmarkParallelUnorderedSynchronizer(b *testing.B) {
}
var wg sync.WaitGroup
ctx, cancelFn := context.WithCancel(context.Background())
s := NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testMemAcc, inputs, &wg)
s := NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testAllocator, typs, inputs, &wg)
s.Init(ctx)
b.SetBytes(8 * int64(coldata.BatchSize()))
b.ResetTimer()
Expand Down
19 changes: 18 additions & 1 deletion pkg/sql/colexec/serial_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import (

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

Expand All @@ -30,6 +33,13 @@ type SerialUnorderedSynchronizer struct {
processorID int32
span *tracing.Span

// outputBatch is the output batch emitted by the synchronizer.
//
// The contract of Operator.Next encourages emitting the same batch across
// Next calls, so batches coming from the inputs are decomposed into vectors
// which are inserted into this batch.
outputBatch coldata.Batch

inputs []colexecargs.OpWithMetaInfo
// curSerialInputIdx indicates the index of the current input being consumed.
curSerialInputIdx int
Expand Down Expand Up @@ -65,13 +75,16 @@ func (s *SerialUnorderedSynchronizer) Child(nth int, verbose bool) execopnode.Op
func NewSerialUnorderedSynchronizer(
flowCtx *execinfra.FlowCtx,
processorID int32,
allocator *colmem.Allocator,
inputTypes []*types.T,
inputs []colexecargs.OpWithMetaInfo,
serialInputIdxExclusiveUpperBound uint32,
exceedsInputIdxExclusiveUpperBoundError error,
) *SerialUnorderedSynchronizer {
return &SerialUnorderedSynchronizer{
flowCtx: flowCtx,
processorID: processorID,
outputBatch: allocator.NewMemBatchNoCols(inputTypes, coldata.BatchSize() /* capacity */),
inputs: inputs,
serialInputIdxExclusiveUpperBound: serialInputIdxExclusiveUpperBound,
exceedsInputIdxExclusiveUpperBoundError: exceedsInputIdxExclusiveUpperBoundError,
Expand Down Expand Up @@ -102,7 +115,11 @@ func (s *SerialUnorderedSynchronizer) Next() coldata.Batch {
colexecerror.ExpectedError(s.exceedsInputIdxExclusiveUpperBoundError)
}
} else {
return b
for i, vec := range b.ColVecs() {
s.outputBatch.ReplaceCol(vec, i)
}
colexecutils.UpdateBatchState(s.outputBatch, b.Length(), b.Selection() != nil /* usesSel */, b.Selection())
return s.outputBatch
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/serial_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func TestSerialUnorderedSynchronizer(t *testing.T) {
s := NewSerialUnorderedSynchronizer(
&execinfra.FlowCtx{Gateway: true},
0, /* processorID */
testAllocator,
typs,
inputs,
0, /* serialInputIdxExclusiveUpperBound */
nil, /* exceedsInputIdxExclusiveUpperBoundError */
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexecop/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Operator interface {
// Calling Next may invalidate the contents of the last Batch returned by
// Next.
//
// Implementations should strive for reusing the same Batch across calls to
// Next which should be possible if the capacity of the Batch didn't change.
//
// It might panic with an expected error, so there must be a "root"
// component that will catch that panic.
Next() coldata.Batch
Expand Down
20 changes: 11 additions & 9 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,14 +953,12 @@ func (s *vectorizedFlowCreator) setupInput(
opWithMetaInfo := inputStreamOps[0]
if len(inputStreamOps) > 1 {
statsInputs := inputStreamOps
allocator := colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory)
if input.Type == execinfrapb.InputSyncSpec_ORDERED {
os := colexec.NewOrderedSynchronizer(
flowCtx,
processorID,
colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory),
execinfra.GetWorkMemLimit(flowCtx), inputStreamOps,
input.ColumnTypes, execinfrapb.ConvertToColumnOrdering(input.Ordering),
0, /* tuplesToMerge */
flowCtx, processorID, allocator, execinfra.GetWorkMemLimit(flowCtx),
inputStreamOps, input.ColumnTypes,
execinfrapb.ConvertToColumnOrdering(input.Ordering), 0, /* tuplesToMerge */
)
opWithMetaInfo = colexecargs.OpWithMetaInfo{
Root: os,
Expand All @@ -975,7 +973,10 @@ func (s *vectorizedFlowCreator) setupInput(
err = execinfra.NewDynamicQueryHasNoHomeRegionError(err)
}
}
sync := colexec.NewSerialUnorderedSynchronizer(flowCtx, processorID, inputStreamOps, input.EnforceHomeRegionStreamExclusiveUpperBound, err)
sync := colexec.NewSerialUnorderedSynchronizer(
flowCtx, processorID, allocator, input.ColumnTypes, inputStreamOps,
input.EnforceHomeRegionStreamExclusiveUpperBound, err,
)
opWithMetaInfo = colexecargs.OpWithMetaInfo{
Root: sync,
MetadataSources: colexecop.MetadataSources{sync},
Expand All @@ -985,8 +986,9 @@ func (s *vectorizedFlowCreator) setupInput(
// Note that if we have opt == flowinfra.FuseAggressively, then we
// must use the serial unordered sync above in order to remove any
// concurrency.
streamingMemAcc := s.monitorRegistry.NewStreamingMemAccount(flowCtx)
sync := colexec.NewParallelUnorderedSynchronizer(flowCtx, processorID, streamingMemAcc, inputStreamOps, s.f.GetWaitGroup())
sync := colexec.NewParallelUnorderedSynchronizer(
flowCtx, processorID, allocator, input.ColumnTypes, inputStreamOps, s.f.GetWaitGroup(),
)
opWithMetaInfo = colexecargs.OpWithMetaInfo{
Root: sync,
MetadataSources: colexecop.MetadataSources{sync},
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestVectorizedFlowShutdown(t *testing.T) {
)
}
syncFlowCtx := &execinfra.FlowCtx{Local: false, Gateway: !addAnotherRemote}
synchronizer := colexec.NewParallelUnorderedSynchronizer(syncFlowCtx, 0 /* processorID */, testMemAcc, synchronizerInputs, &wg)
synchronizer := colexec.NewParallelUnorderedSynchronizer(syncFlowCtx, 0 /* processorID */, testAllocator, typs, synchronizerInputs, &wg)
inputMetadataSource := colexecop.MetadataSource(synchronizer)

runOutboxInbox := func(
Expand Down
Loading