Skip to content

Commit

Permalink
colexec: fix memory leak in simpleProjectOp
Browse files Browse the repository at this point in the history
This commit fixes a bounded memory leak in `simpleProjectOp` that has
been present since 20.2 version. The leak was introduced via the
combination of
- 57eb4f8 in which we began tracking
all batches seen by the operator so that we could decide whether we
need to allocate a fresh projecting batch or not
- 895125b in which we started using the
dynamic sizing for batches (where we'd start with size 1 and grow
exponentially until 1024 while previously we would always use 1024).

Both changes combined made it so that the `simpleProjectOp` would keep
_all_ batches with different sizes alive until the query shutdown. The
problem later got more exacerbated when we introduced dynamic batch
sizing all over the place (for example, in the spilling queue).

Let's discuss the reasoning for why we needed something like the
tracking we did in the first change mentioned above. The simple project
op works by putting a small wrapper (a "lense") `projectingBatch` over
the batch coming from the input. That wrapper can be modified later by
other operators (for example, a new vector might be appended), which
would also modify the original batch coming from the input, so we need
to allow for the wrapper to be mutated accordingly. At the same time,
the input operator might decide to produce a fresh batch (for example,
because of the dynamically growing size) which wouldn't have the same
"upstream" mutation applied to it, so we need to allocate a fresh
projecting batch and let the upstream do its modifications. In the first
change we solved it by keeping a map from the input batch to the
projecting batch.

This commit addresses the same issue by only checking whether the input
batch is the same one as was seen by the `simpleProjectOp` on the
previous call. If they are the same, then we can reuse the last
projecting batch; otherwise, we need to allocate a fresh one and memoize
it. In other words, we effectively reduce the map to have at most one
entry.

This means that with dynamic batch size growth we'd create a few
`projectingBatch` wrappers, but that is ok given that we expect the
dynamic size heuristics to quickly settle on the size.

Before we introduced the dynamic batch sizing, different batches could
only be produced by the unordered synchronizers, so that's another place
this commit adjusts. If we didn't do anything, then the simplistic
mapping with at most one entry could result in "thrashing" - i.e. in the
extreme case where the inputs to the synchronizer would produce batches
in round-robin fashion, we'd end up creating a new `projectingBatch`
every time which would be quite wasteful. In this commit we modify both
the parallel and the serial unordered synchronizers to always emit the
same output batch which is populated by manually inserting vectors from
the input batch.

Release note (bug fix): Bounded memory leak that could previously occur
when evaluating some memory-intensive queries via the vectorized engine
in CockroachDB has now been fixed. The leak has been present since 20.2
version.
  • Loading branch information
yuzefovich committed Jan 14, 2025
1 parent 2e25969 commit f3b94a4
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 42 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ go_library(
"//pkg/util", # keep
"//pkg/util/duration", # keep
"//pkg/util/json", # keep
"//pkg/util/log",
"//pkg/util/log", # keep
"//pkg/util/timeutil/pgdate", # keep
"//pkg/util/uuid", # keep
"@com_github_cockroachdb_apd_v3//:apd", # keep
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecbase/cast_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var (
_ = pgcode.Syntax
_ = pgdate.ParseTimestamp
_ = pgerror.Wrapf
_ = log.ExpensiveLogEnabled
)

// {{/*
Expand Down
36 changes: 14 additions & 22 deletions pkg/sql/colexec/colexecbase/simple_project.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,16 @@ import (

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// simpleProjectOp is an operator that implements "simple projection" - removal of
// columns that aren't needed by later operators.
type simpleProjectOp struct {
colexecop.NonExplainable
batches map[coldata.Batch]*projectingBatch
colexecop.OneInputInitCloserHelper

projection []uint32
// numBatchesLoggingThreshold is the threshold on the number of items in
// 'batches' map at which we will log a message when a new projectingBatch
// is created. It is growing exponentially.
numBatchesLoggingThreshold int
batch *projectingBatch
}

var _ colexecop.ClosableOperator = &simpleProjectOp{}
Expand Down Expand Up @@ -105,10 +101,8 @@ func NewSimpleProjectOp(
}
}
s := &simpleProjectOp{
OneInputInitCloserHelper: colexecop.MakeOneInputInitCloserHelper(input),
projection: make([]uint32, len(projection)),
batches: make(map[coldata.Batch]*projectingBatch),
numBatchesLoggingThreshold: 128,
OneInputInitCloserHelper: colexecop.MakeOneInputInitCloserHelper(input),
projection: make([]uint32, len(projection)),
}
// We make a copy of projection to be safe.
copy(s.projection, projection)
Expand All @@ -120,19 +114,17 @@ func (d *simpleProjectOp) Next() coldata.Batch {
if batch.Length() == 0 {
return coldata.ZeroBatch
}
projBatch, found := d.batches[batch]
if !found {
projBatch = newProjectionBatch(d.projection)
d.batches[batch] = projBatch
if len(d.batches) == d.numBatchesLoggingThreshold {
if log.V(1) {
log.Infof(d.Ctx, "simpleProjectOp: size of 'batches' map = %d", len(d.batches))
}
d.numBatchesLoggingThreshold = d.numBatchesLoggingThreshold * 2
}
if d.batch == nil || d.batch.Batch != batch {
// Create a fresh projection batch if we haven't created one already or
// if we see a different "internally" batch coming from the input. The
// latter case can happen during dynamically growing the size of the
// batch in the input, and we need to create a fresh projection batch
// since the last one might have been modified higher up in the tree
// (e.g. a vector could have been appended).
d.batch = newProjectionBatch(d.projection)
}
projBatch.Batch = batch
return projBatch
d.batch.Batch = batch
return d.batch
}

func (d *simpleProjectOp) Reset(ctx context.Context) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecbase/simple_project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestSimpleProjectOpWithUnorderedSynchronizer(t *testing.T) {
for i := range parallelUnorderedSynchronizerInputs {
parallelUnorderedSynchronizerInputs[i].Root = inputs[i]
}
input = colexec.NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testMemAcc, parallelUnorderedSynchronizerInputs, &wg)
input = colexec.NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testAllocator, inputTypes, parallelUnorderedSynchronizerInputs, &wg)
input = colexecbase.NewSimpleProjectOp(input, len(inputTypes), []uint32{0})
return colexecbase.NewConstOp(testAllocator, input, types.Int, constVal, 1)
})
Expand Down
23 changes: 20 additions & 3 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -96,6 +98,14 @@ type ParallelUnorderedSynchronizer struct {
// the corresponding to it input.
nextBatch []func()

// outputBatch is the output batch emitted by the synchronizer. Batches
// coming from the inputs are decomposed into vectors which are inserted
// into this batch. This is needed to simplify the job of the
// simpleProjectOp which needs to decide whether to allocate a fresh
// projectingBatch or not (which it needs to do whenever it observes a
// particular batch for the first time).
outputBatch coldata.Batch

state int32
// externalWaitGroup refers to the WaitGroup passed in externally. Since the
// ParallelUnorderedSynchronizer spawns goroutines, this allows callers to
Expand Down Expand Up @@ -159,7 +169,8 @@ func hasParallelUnorderedSync(op execopnode.OpNode) bool {
func NewParallelUnorderedSynchronizer(
flowCtx *execinfra.FlowCtx,
processorID int32,
streamingMemAcc *mon.BoundAccount,
allocator *colmem.Allocator,
inputTypes []*types.T,
inputs []colexecargs.OpWithMetaInfo,
wg *sync.WaitGroup,
) *ParallelUnorderedSynchronizer {
Expand Down Expand Up @@ -206,13 +217,14 @@ func NewParallelUnorderedSynchronizer(
return &ParallelUnorderedSynchronizer{
flowCtx: flowCtx,
processorID: processorID,
streamingMemAcc: streamingMemAcc,
streamingMemAcc: allocator.Acc(),
inputs: inputs,
cancelInputsOnDrain: cancelInputs,
tracingSpans: make([]*tracing.Span, len(inputs)),
readNextBatch: readNextBatch,
batches: make([]coldata.Batch, len(inputs)),
nextBatch: make([]func(), len(inputs)),
outputBatch: allocator.NewMemBatchNoCols(inputTypes, coldata.BatchSize() /* capacity */),
externalWaitGroup: wg,
internalWaitGroup: &sync.WaitGroup{},
// batchCh is a buffered channel in order to offer non-blocking writes to
Expand Down Expand Up @@ -441,7 +453,11 @@ func (s *ParallelUnorderedSynchronizer) Next() coldata.Batch {
s.bufferedMeta = append(s.bufferedMeta, msg.meta...)
continue
}
return msg.b
for i, vec := range msg.b.ColVecs() {
s.outputBatch.ReplaceCol(vec, i)
}
colexecutils.UpdateBatchState(s.outputBatch, msg.b.Length(), msg.b.Selection() != nil /* usesSel */, msg.b.Selection())
return s.outputBatch
}
}
}
Expand Down Expand Up @@ -565,6 +581,7 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada

// Done.
s.setState(parallelUnorderedSynchronizerStateDone)
s.outputBatch = nil
bufferedMeta := s.bufferedMeta
// Eagerly lose the reference to the metadata since it might be of
// non-trivial footprint.
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/colexec/parallel_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestParallelUnorderedSynchronizer(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())

var wg sync.WaitGroup
s := NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testMemAcc, inputs, &wg)
s := NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testAllocator, typs, inputs, &wg)
s.Init(ctx)

t.Run(fmt.Sprintf("numInputs=%d/numBatches=%d/terminationScenario=%d", numInputs, numBatches, terminationScenario), func(t *testing.T) {
Expand Down Expand Up @@ -193,6 +193,7 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
// This code is unreachable, but the compiler cannot infer that.
return nil
}}
typs := types.OneIntCol
for i := 1; i < len(inputs); i++ {
acc := testMemMonitor.MakeBoundAccount()
defer acc.Close(ctx)
Expand All @@ -201,7 +202,7 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
NextCb: func() coldata.Batch {
// All inputs that do not encounter an error will continue to return
// batches.
b := allocator.NewMemBatchWithMaxCapacity([]*types.T{types.Int})
b := allocator.NewMemBatchWithMaxCapacity(typs)
b.SetLength(1)
return b
},
Expand All @@ -224,7 +225,7 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
}

var wg sync.WaitGroup
s := NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testMemAcc, inputs, &wg)
s := NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testAllocator, typs, inputs, &wg)
s.Init(ctx)
for {
if err := colexecerror.CatchVectorizedRuntimeError(func() { _ = s.Next() }); err != nil {
Expand Down Expand Up @@ -256,7 +257,7 @@ func BenchmarkParallelUnorderedSynchronizer(b *testing.B) {
}
var wg sync.WaitGroup
ctx, cancelFn := context.WithCancel(context.Background())
s := NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testMemAcc, inputs, &wg)
s := NewParallelUnorderedSynchronizer(&execinfra.FlowCtx{Local: true, Gateway: true}, 0 /* processorID */, testAllocator, typs, inputs, &wg)
s.Init(ctx)
b.SetBytes(8 * int64(coldata.BatchSize()))
b.ResetTimer()
Expand Down
20 changes: 19 additions & 1 deletion pkg/sql/colexec/serial_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import (

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

Expand All @@ -30,6 +33,14 @@ type SerialUnorderedSynchronizer struct {
processorID int32
span *tracing.Span

// outputBatch is the output batch emitted by the synchronizer. Batches
// coming from the inputs are decomposed into vectors which are inserted
// into this batch. This is needed to simplify the job of the
// simpleProjectOp which needs to decide whether to allocate a fresh
// projectingBatch or not (which it needs to do whenever it observes a
// particular batch for the first time).
outputBatch coldata.Batch

inputs []colexecargs.OpWithMetaInfo
// curSerialInputIdx indicates the index of the current input being consumed.
curSerialInputIdx int
Expand Down Expand Up @@ -65,13 +76,16 @@ func (s *SerialUnorderedSynchronizer) Child(nth int, verbose bool) execopnode.Op
func NewSerialUnorderedSynchronizer(
flowCtx *execinfra.FlowCtx,
processorID int32,
allocator *colmem.Allocator,
inputTypes []*types.T,
inputs []colexecargs.OpWithMetaInfo,
serialInputIdxExclusiveUpperBound uint32,
exceedsInputIdxExclusiveUpperBoundError error,
) *SerialUnorderedSynchronizer {
return &SerialUnorderedSynchronizer{
flowCtx: flowCtx,
processorID: processorID,
outputBatch: allocator.NewMemBatchNoCols(inputTypes, coldata.BatchSize() /* capacity */),
inputs: inputs,
serialInputIdxExclusiveUpperBound: serialInputIdxExclusiveUpperBound,
exceedsInputIdxExclusiveUpperBoundError: exceedsInputIdxExclusiveUpperBoundError,
Expand Down Expand Up @@ -102,7 +116,11 @@ func (s *SerialUnorderedSynchronizer) Next() coldata.Batch {
colexecerror.ExpectedError(s.exceedsInputIdxExclusiveUpperBoundError)
}
} else {
return b
for i, vec := range b.ColVecs() {
s.outputBatch.ReplaceCol(vec, i)
}
colexecutils.UpdateBatchState(s.outputBatch, b.Length(), b.Selection() != nil /* usesSel */, b.Selection())
return s.outputBatch
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/serial_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func TestSerialUnorderedSynchronizer(t *testing.T) {
s := NewSerialUnorderedSynchronizer(
&execinfra.FlowCtx{Gateway: true},
0, /* processorID */
testAllocator,
typs,
inputs,
0, /* serialInputIdxExclusiveUpperBound */
nil, /* exceedsInputIdxExclusiveUpperBoundError */
Expand Down
20 changes: 11 additions & 9 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,14 +953,12 @@ func (s *vectorizedFlowCreator) setupInput(
opWithMetaInfo := inputStreamOps[0]
if len(inputStreamOps) > 1 {
statsInputs := inputStreamOps
allocator := colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory)
if input.Type == execinfrapb.InputSyncSpec_ORDERED {
os := colexec.NewOrderedSynchronizer(
flowCtx,
processorID,
colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory),
execinfra.GetWorkMemLimit(flowCtx), inputStreamOps,
input.ColumnTypes, execinfrapb.ConvertToColumnOrdering(input.Ordering),
0, /* tuplesToMerge */
flowCtx, processorID, allocator, execinfra.GetWorkMemLimit(flowCtx),
inputStreamOps, input.ColumnTypes,
execinfrapb.ConvertToColumnOrdering(input.Ordering), 0, /* tuplesToMerge */
)
opWithMetaInfo = colexecargs.OpWithMetaInfo{
Root: os,
Expand All @@ -975,7 +973,10 @@ func (s *vectorizedFlowCreator) setupInput(
err = execinfra.NewDynamicQueryHasNoHomeRegionError(err)
}
}
sync := colexec.NewSerialUnorderedSynchronizer(flowCtx, processorID, inputStreamOps, input.EnforceHomeRegionStreamExclusiveUpperBound, err)
sync := colexec.NewSerialUnorderedSynchronizer(
flowCtx, processorID, allocator, input.ColumnTypes, inputStreamOps,
input.EnforceHomeRegionStreamExclusiveUpperBound, err,
)
opWithMetaInfo = colexecargs.OpWithMetaInfo{
Root: sync,
MetadataSources: colexecop.MetadataSources{sync},
Expand All @@ -985,8 +986,9 @@ func (s *vectorizedFlowCreator) setupInput(
// Note that if we have opt == flowinfra.FuseAggressively, then we
// must use the serial unordered sync above in order to remove any
// concurrency.
streamingMemAcc := s.monitorRegistry.NewStreamingMemAccount(flowCtx)
sync := colexec.NewParallelUnorderedSynchronizer(flowCtx, processorID, streamingMemAcc, inputStreamOps, s.f.GetWaitGroup())
sync := colexec.NewParallelUnorderedSynchronizer(
flowCtx, processorID, allocator, input.ColumnTypes, inputStreamOps, s.f.GetWaitGroup(),
)
opWithMetaInfo = colexecargs.OpWithMetaInfo{
Root: sync,
MetadataSources: colexecop.MetadataSources{sync},
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestVectorizedFlowShutdown(t *testing.T) {
)
}
syncFlowCtx := &execinfra.FlowCtx{Local: false, Gateway: !addAnotherRemote}
synchronizer := colexec.NewParallelUnorderedSynchronizer(syncFlowCtx, 0 /* processorID */, testMemAcc, synchronizerInputs, &wg)
synchronizer := colexec.NewParallelUnorderedSynchronizer(syncFlowCtx, 0 /* processorID */, testAllocator, typs, synchronizerInputs, &wg)
inputMetadataSource := colexecop.MetadataSource(synchronizer)

runOutboxInbox := func(
Expand Down

0 comments on commit f3b94a4

Please sign in to comment.