diff --git a/pkg/common/malloc/allocator.go b/pkg/common/malloc/allocator.go index c83667a1ccff3..6273a0f6d4e28 100644 --- a/pkg/common/malloc/allocator.go +++ b/pkg/common/malloc/allocator.go @@ -14,12 +14,10 @@ package malloc -import "unsafe" - type Allocator interface { - Allocate(size uint64, hint Hints) (unsafe.Pointer, Deallocator, error) + Allocate(size uint64, hint Hints) ([]byte, Deallocator, error) } type Deallocator interface { - Deallocate(ptr unsafe.Pointer, hint Hints) + Deallocate(hint Hints) } diff --git a/pkg/common/malloc/allocator_bench_test.go b/pkg/common/malloc/allocator_bench_test.go index fcec4d571fae3..81eddadabd3e4 100644 --- a/pkg/common/malloc/allocator_bench_test.go +++ b/pkg/common/malloc/allocator_bench_test.go @@ -33,11 +33,11 @@ func benchmarkAllocator( allcator := newAllocator() b.ResetTimer() for i := 0; i < b.N; i++ { - ptr, dec, err := allcator.Allocate(n, NoHints) + _, dec, err := allcator.Allocate(n, NoHints) if err != nil { b.Fatal(err) } - dec.Deallocate(ptr, NoHints) + dec.Deallocate(NoHints) } }) @@ -46,11 +46,11 @@ func benchmarkAllocator( b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - ptr, dec, err := allcator.Allocate(n, NoHints) + _, dec, err := allcator.Allocate(n, NoHints) if err != nil { b.Fatal(err) } - dec.Deallocate(ptr, NoHints) + dec.Deallocate(NoHints) } }) }) diff --git a/pkg/common/malloc/allocator_test.go b/pkg/common/malloc/allocator_test.go index 6c23897436660..ca93c8644835e 100644 --- a/pkg/common/malloc/allocator_test.go +++ b/pkg/common/malloc/allocator_test.go @@ -17,7 +17,6 @@ package malloc import ( "math" "testing" - "unsafe" ) func testAllocator( @@ -29,20 +28,40 @@ func testAllocator( t.Run("allocate", func(t *testing.T) { allocator := newAllocator() for i := uint64(1); i < 128*MB; i = uint64(math.Ceil(float64(i) * 1.1)) { - ptr, dec, err := allocator.Allocate(i, NoHints) + // allocate + slice, dec, err := allocator.Allocate(i, NoHints) if err != nil { t.Fatal(err) } - slice := unsafe.Slice((*byte)(ptr), i) + // len + if len(slice) != int(i) { + t.Fatal() + } + // read for _, i := range slice { if i != 0 { t.Fatal("not zeroed") } } + // write for i := range slice { slice[i] = byte(i) } - dec.Deallocate(ptr, NoHints) + // read + for i := range slice { + if slice[i] != byte(i) { + t.Fatal() + } + } + // slice + slice = slice[:len(slice)/2] + for i := range slice { + if slice[i] != byte(i) { + t.Fatal() + } + } + // deallocate + dec.Deallocate(NoHints) } }) diff --git a/pkg/common/malloc/c_allocator.go b/pkg/common/malloc/c_allocator.go index ad3ce92f99fff..a9e1803f40889 100644 --- a/pkg/common/malloc/c_allocator.go +++ b/pkg/common/malloc/c_allocator.go @@ -22,24 +22,32 @@ import "unsafe" import "C" type CAllocator struct { + deallocatorPool *ClosureDeallocatorPool[cDeallocatorArgs] +} + +type cDeallocatorArgs struct { + ptr unsafe.Pointer } func NewCAllocator() *CAllocator { - return &CAllocator{} + return &CAllocator{ + deallocatorPool: NewClosureDeallocatorPool( + func(hints Hints, args cDeallocatorArgs) { + C.free(args.ptr) + }, + ), + } } var _ Allocator = new(CAllocator) -func (c *CAllocator) Allocate(size uint64, hints Hints) (unsafe.Pointer, Deallocator, error) { +func (c *CAllocator) Allocate(size uint64, hints Hints) ([]byte, Deallocator, error) { ptr := C.malloc(C.ulong(size)) if hints&NoClear == 0 { clear(unsafe.Slice((*byte)(ptr), size)) } - return ptr, c, nil -} - -var _ Deallocator = new(CAllocator) - -func (c *CAllocator) Deallocate(ptr unsafe.Pointer, hints Hints) { - C.free(ptr) + slice := unsafe.Slice((*byte)(ptr), size) + return slice, c.deallocatorPool.Get(cDeallocatorArgs{ + ptr: ptr, + }), nil } diff --git a/pkg/common/malloc/chain_deallocator.go b/pkg/common/malloc/chain_deallocator.go index 4be388565d7e1..e7a65a72d7012 100644 --- a/pkg/common/malloc/chain_deallocator.go +++ b/pkg/common/malloc/chain_deallocator.go @@ -16,16 +16,15 @@ package malloc import ( "sync" - "unsafe" ) type chainDeallocator []Deallocator var _ Deallocator = &chainDeallocator{} -func (c *chainDeallocator) Deallocate(ptr unsafe.Pointer, hints Hints) { +func (c *chainDeallocator) Deallocate(hints Hints) { for i := len(*c) - 1; i >= 0; i-- { - (*c)[i].Deallocate(ptr, hints) + (*c)[i].Deallocate(hints) } *c = (*c)[:0] chainDeallocatorPool.Put(c) diff --git a/pkg/common/malloc/checked_allocator.go b/pkg/common/malloc/checked_allocator.go index 93ee282eeb51e..600d8641d8ec2 100644 --- a/pkg/common/malloc/checked_allocator.go +++ b/pkg/common/malloc/checked_allocator.go @@ -17,15 +17,14 @@ package malloc import ( "fmt" "runtime" - "sync" "sync/atomic" "unsafe" ) type CheckedAllocator struct { - upstream Allocator - fraction uint32 - funcPool sync.Pool + upstream Allocator + fraction uint32 + deallocatorPool *ClosureDeallocatorPool[checkedAllocatorArgs] } type checkedAllocatorArgs struct { @@ -33,43 +32,38 @@ type checkedAllocatorArgs struct { deallocator Deallocator stackID uint64 size uint64 + ptr unsafe.Pointer } func NewCheckedAllocator(upstream Allocator, fraction uint32) *CheckedAllocator { - ret := &CheckedAllocator{ + return &CheckedAllocator{ upstream: upstream, fraction: fraction, - } - ret.funcPool = sync.Pool{ - New: func() any { - argumented := new(argumentedFuncDeallocator[checkedAllocatorArgs]) - argumented.fn = func(ptr unsafe.Pointer, hints Hints, args checkedAllocatorArgs) { + deallocatorPool: NewClosureDeallocatorPool( + func(hints Hints, args checkedAllocatorArgs) { if !args.deallocated.CompareAndSwap(false, true) { panic(fmt.Sprintf( "double free: address %p, size %v, allocated at %s", - ptr, + args.ptr, args.size, stackInfo(args.stackID), )) } hints |= DoNotReuse - args.deallocator.Deallocate(ptr, hints) + args.deallocator.Deallocate(hints) - ret.funcPool.Put(argumented) - } - return argumented - }, + }, + ), } - return ret } var _ Allocator = new(CheckedAllocator) -func (c *CheckedAllocator) Allocate(size uint64, hints Hints) (unsafe.Pointer, Deallocator, error) { +func (c *CheckedAllocator) Allocate(size uint64, hints Hints) ([]byte, Deallocator, error) { ptr, dec, err := c.upstream.Allocate(size, hints) if err != nil { return nil, nil, err @@ -92,12 +86,13 @@ func (c *CheckedAllocator) Allocate(size uint64, hints Hints) (unsafe.Pointer, D } }) - fn := c.funcPool.Get().(*argumentedFuncDeallocator[checkedAllocatorArgs]) - fn.SetArgument(checkedAllocatorArgs{ + dec = c.deallocatorPool.Get(checkedAllocatorArgs{ deallocated: deallocated, deallocator: dec, stackID: stackID, size: size, + ptr: unsafe.Pointer(unsafe.SliceData(ptr)), }) - return ptr, fn, nil + + return ptr, dec, nil } diff --git a/pkg/common/malloc/checked_allocator_test.go b/pkg/common/malloc/checked_allocator_test.go index 8d3ef25ace159..91066edec9ccc 100644 --- a/pkg/common/malloc/checked_allocator_test.go +++ b/pkg/common/malloc/checked_allocator_test.go @@ -19,7 +19,6 @@ import ( "runtime" "strings" "testing" - "unsafe" ) func TestCheckedAllocator(t *testing.T) { @@ -42,7 +41,7 @@ func TestCheckedAllocator(t *testing.T) { } // comment the following line to trigger a missing-free panic // this panic will be raised in SetFinalizer func so it's not recoverable and not testable - dec.Deallocate(ptr, NoHints) + dec.Deallocate(NoHints) _ = ptr _ = dec runtime.GC() @@ -64,12 +63,12 @@ func TestCheckedAllocator(t *testing.T) { NewClassAllocator(NewFixedSizeMmapAllocator), 1, ) - ptr, dec, err := allocator.Allocate(42, NoHints) + _, dec, err := allocator.Allocate(42, NoHints) if err != nil { t.Fatal(err) } - dec.Deallocate(ptr, NoHints) - dec.Deallocate(ptr, NoHints) + dec.Deallocate(NoHints) + dec.Deallocate(NoHints) }) // use after free @@ -78,15 +77,14 @@ func TestCheckedAllocator(t *testing.T) { NewClassAllocator(NewFixedSizeMmapAllocator), 1, ) - ptr, dec, err := allocator.Allocate(42, NoHints) + slice, dec, err := allocator.Allocate(42, NoHints) if err != nil { t.Fatal(err) } - slice := unsafe.Slice((*byte)(ptr), 42) for i := range slice { slice[i] = byte(i) } - dec.Deallocate(ptr, NoHints) + dec.Deallocate(NoHints) // zero or segfault //for i := range slice { // if slice[i] != 0 { diff --git a/pkg/common/malloc/class_allocator.go b/pkg/common/malloc/class_allocator.go index c0cd36f45cf71..31384b1d2f776 100644 --- a/pkg/common/malloc/class_allocator.go +++ b/pkg/common/malloc/class_allocator.go @@ -16,7 +16,6 @@ package malloc import ( "math/bits" - "unsafe" "github.com/matrixorigin/matrixone/pkg/common/moerr" ) @@ -52,7 +51,7 @@ func NewClassAllocator[T FixedSizeAllocator]( var _ Allocator = new(ClassAllocator[*fixedSizeMmapAllocator]) -func (c *ClassAllocator[T]) Allocate(size uint64, hints Hints) (unsafe.Pointer, Deallocator, error) { +func (c *ClassAllocator[T]) Allocate(size uint64, hints Hints) ([]byte, Deallocator, error) { if size == 0 { return nil, nil, moerr.NewInternalErrorNoCtx("invalid allocate size: 0") } @@ -68,5 +67,10 @@ func (c *ClassAllocator[T]) Allocate(size uint64, hints Hints) (unsafe.Pointer, if i >= len(c.classes) { return nil, nil, moerr.NewInternalErrorNoCtx("cannot allocate %v bytes: too large", size) } - return c.classes[i].allocator.Allocate(hints) + slice, dec, err := c.classes[i].allocator.Allocate(hints) + if err != nil { + return nil, nil, err + } + slice = slice[:size] + return slice, dec, nil } diff --git a/pkg/common/malloc/closure_deallocator.go b/pkg/common/malloc/closure_deallocator.go new file mode 100644 index 0000000000000..d2b01f52e2af4 --- /dev/null +++ b/pkg/common/malloc/closure_deallocator.go @@ -0,0 +1,61 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package malloc + +import ( + "sync" +) + +type ClosureDeallocator[T any] struct { + argument T + fn func(Hints, T) +} + +func (a *ClosureDeallocator[T]) SetArgument(arg T) { + a.argument = arg +} + +var _ Deallocator = &ClosureDeallocator[int]{} + +func (a *ClosureDeallocator[T]) Deallocate(hints Hints) { + a.fn(hints, a.argument) +} + +type ClosureDeallocatorPool[T any] struct { + pool sync.Pool +} + +func NewClosureDeallocatorPool[T any]( + deallocateFunc func(Hints, T), +) *ClosureDeallocatorPool[T] { + ret := new(ClosureDeallocatorPool[T]) + + ret.pool.New = func() any { + closure := new(ClosureDeallocator[T]) + closure.fn = func(hints Hints, args T) { + deallocateFunc(hints, args) + ret.pool.Put(closure) + } + return closure + } + + return ret +} + +func (c *ClosureDeallocatorPool[T]) Get(args T) Deallocator { + closure := c.pool.Get().(*ClosureDeallocator[T]) + closure.SetArgument(args) + return closure +} diff --git a/pkg/common/malloc/fixed_size_allocator.go b/pkg/common/malloc/fixed_size_allocator.go index 573a134a2cd5b..50d2644b81690 100644 --- a/pkg/common/malloc/fixed_size_allocator.go +++ b/pkg/common/malloc/fixed_size_allocator.go @@ -14,8 +14,6 @@ package malloc -import "unsafe" - type FixedSizeAllocator interface { - Allocate(hint Hints) (unsafe.Pointer, Deallocator, error) + Allocate(hint Hints) ([]byte, Deallocator, error) } diff --git a/pkg/common/malloc/fixed_size_mmap_allocator.go b/pkg/common/malloc/fixed_size_mmap_allocator.go index 2707a31692c32..c122424c155f1 100644 --- a/pkg/common/malloc/fixed_size_mmap_allocator.go +++ b/pkg/common/malloc/fixed_size_mmap_allocator.go @@ -35,37 +35,87 @@ type fixedSizeMmapAllocator struct { buffer1 chan unsafe.Pointer // buffer2 buffers MADV_DONTNEED objects buffer2 chan unsafe.Pointer + + deallocatorPool *ClosureDeallocatorPool[fixedSizeMmapDeallocatorArgs] +} + +type fixedSizeMmapDeallocatorArgs struct { + ptr unsafe.Pointer } func NewFixedSizeMmapAllocator( size uint64, -) *fixedSizeMmapAllocator { +) (ret *fixedSizeMmapAllocator) { + // if size is larger than smallClassCap, num1 will be zero, buffer1 will be empty num1 := smallClassCap / size if num1 > maxBuffer1Cap { // don't buffer too much, since chans with larger buffer consume more memory num1 = maxBuffer1Cap } - ret := &fixedSizeMmapAllocator{ + + ret = &fixedSizeMmapAllocator{ size: size, buffer1: make(chan unsafe.Pointer, num1), buffer2: make(chan unsafe.Pointer, buffer2Cap), + + deallocatorPool: NewClosureDeallocatorPool( + func(hints Hints, args fixedSizeMmapDeallocatorArgs) { + + if hints&DoNotReuse > 0 { + if err := unix.Munmap( + unsafe.Slice((*byte)(args.ptr), size), + ); err != nil { + panic(err) + } + return + } + + select { + + case ret.buffer1 <- args.ptr: + // buffer in buffer1 + + default: + + ret.freeMem(args.ptr) + + select { + + case ret.buffer2 <- args.ptr: + // buffer in buffer2 + + default: + // unmap + if err := unix.Munmap( + unsafe.Slice((*byte)(args.ptr), size), + ); err != nil { + panic(err) + } + + } + + } + + }, + ), } + return ret } var _ FixedSizeAllocator = new(fixedSizeMmapAllocator) -func (f *fixedSizeMmapAllocator) Allocate(hints Hints) (ptr unsafe.Pointer, dec Deallocator, err error) { +func (f *fixedSizeMmapAllocator) Allocate(hints Hints) (slice []byte, dec Deallocator, err error) { select { case ptr := <-f.buffer1: // from buffer1 + slice = unsafe.Slice((*byte)(ptr), f.size) if hints&NoClear == 0 { - clear(unsafe.Slice((*byte)(ptr), f.size)) + clear(slice) } - return ptr, f, nil default: @@ -74,11 +124,11 @@ func (f *fixedSizeMmapAllocator) Allocate(hints Hints) (ptr unsafe.Pointer, dec case ptr := <-f.buffer2: // from buffer2 f.reuseMem(ptr, hints) - return ptr, f, nil + slice = unsafe.Slice((*byte)(ptr), f.size) default: // allocate new - data, err := unix.Mmap( + slice, err = unix.Mmap( -1, 0, int(f.size), unix.PROT_READ|unix.PROT_WRITE, @@ -87,50 +137,12 @@ func (f *fixedSizeMmapAllocator) Allocate(hints Hints) (ptr unsafe.Pointer, dec if err != nil { return nil, nil, err } - return unsafe.Pointer(unsafe.SliceData(data)), f, nil - - } - - } -} - -var _ Deallocator = new(fixedSizeMmapAllocator) - -func (f *fixedSizeMmapAllocator) Deallocate(ptr unsafe.Pointer, hints Hints) { - - if hints&DoNotReuse > 0 { - if err := unix.Munmap( - unsafe.Slice((*byte)(ptr), f.size), - ); err != nil { - panic(err) - } - return - } - - select { - - case f.buffer1 <- ptr: - // buffer in buffer1 - - default: - - f.freeMem(ptr) - - select { - - case f.buffer2 <- ptr: - // buffer in buffer2 - - default: - // unmap - if err := unix.Munmap( - unsafe.Slice((*byte)(ptr), f.size), - ); err != nil { - panic(err) - } } } + return slice, f.deallocatorPool.Get(fixedSizeMmapDeallocatorArgs{ + ptr: unsafe.Pointer(unsafe.SliceData(slice)), + }), nil } diff --git a/pkg/common/malloc/fixed_size_sync_pool_allocator.go b/pkg/common/malloc/fixed_size_sync_pool_allocator.go index 59b753cf29a53..1b213a74b533d 100644 --- a/pkg/common/malloc/fixed_size_sync_pool_allocator.go +++ b/pkg/common/malloc/fixed_size_sync_pool_allocator.go @@ -16,41 +16,50 @@ package malloc import ( "sync" - "unsafe" ) type fixedSizeSyncPoolAllocator struct { - size uint64 - pool sync.Pool + size uint64 + pool sync.Pool + deallocatorPool *ClosureDeallocatorPool[fixedSizeSyncPoolDeallocatorArgs] } -func NewFixedSizeSyncPoolAllocator(size uint64) *fixedSizeSyncPoolAllocator { - return &fixedSizeSyncPoolAllocator{ +type fixedSizeSyncPoolDeallocatorArgs struct { + slice *[]byte +} + +func NewFixedSizeSyncPoolAllocator(size uint64) (ret *fixedSizeSyncPoolAllocator) { + ret = &fixedSizeSyncPoolAllocator{ size: size, + pool: sync.Pool{ New: func() any { - ptr := unsafe.Pointer(unsafe.SliceData(make([]byte, size))) - return ptr + slice := make([]byte, size) + return &slice }, }, + + deallocatorPool: NewClosureDeallocatorPool( + func(hint Hints, args fixedSizeSyncPoolDeallocatorArgs) { + if hint&DoNotReuse > 0 { + return + } + ret.pool.Put(args.slice) + }, + ), } + + return } var _ FixedSizeAllocator = new(fixedSizeSyncPoolAllocator) -func (f *fixedSizeSyncPoolAllocator) Allocate(hint Hints) (unsafe.Pointer, Deallocator, error) { - ptr := f.pool.Get().(unsafe.Pointer) +func (f *fixedSizeSyncPoolAllocator) Allocate(hint Hints) ([]byte, Deallocator, error) { + slice := f.pool.Get().(*[]byte) if hint&NoClear == 0 { - clear(unsafe.Slice((*byte)(ptr), f.size)) - } - return ptr, f, nil -} - -var _ Deallocator = new(fixedSizeSyncPoolAllocator) - -func (f *fixedSizeSyncPoolAllocator) Deallocate(ptr unsafe.Pointer, hint Hints) { - if hint&DoNotReuse > 0 { - return + clear(*slice) } - f.pool.Put(ptr) + return *slice, f.deallocatorPool.Get(fixedSizeSyncPoolDeallocatorArgs{ + slice: slice, + }), nil } diff --git a/pkg/common/malloc/func_deallocator.go b/pkg/common/malloc/func_deallocator.go index 54c032b6533b8..76ea94325f72a 100644 --- a/pkg/common/malloc/func_deallocator.go +++ b/pkg/common/malloc/func_deallocator.go @@ -14,27 +14,10 @@ package malloc -import "unsafe" - -type FuncDeallocator func(ptr unsafe.Pointer) +type FuncDeallocator func(hints Hints) var _ Deallocator = FuncDeallocator(nil) -func (f FuncDeallocator) Deallocate(ptr unsafe.Pointer, hints Hints) { - f(ptr) -} - -type argumentedFuncDeallocator[T any] struct { - argument T - fn func(unsafe.Pointer, Hints, T) -} - -func (a *argumentedFuncDeallocator[T]) SetArgument(arg T) { - a.argument = arg -} - -var _ Deallocator = &argumentedFuncDeallocator[int]{} - -func (a *argumentedFuncDeallocator[T]) Deallocate(ptr unsafe.Pointer, hints Hints) { - a.fn(ptr, hints, a.argument) +func (f FuncDeallocator) Deallocate(hints Hints) { + f(hints) } diff --git a/pkg/common/malloc/fuzz_test.go b/pkg/common/malloc/fuzz_test.go index d3f4259a7ff07..92685d1451b16 100644 --- a/pkg/common/malloc/fuzz_test.go +++ b/pkg/common/malloc/fuzz_test.go @@ -17,7 +17,6 @@ package malloc import ( "testing" "time" - "unsafe" ) func fuzzAllocator( @@ -31,22 +30,38 @@ func fuzzAllocator( return } + // allocate size := i % (8 * GB) - ptr, dec, err := allocator.Allocate(size, NoHints) + slice, dec, err := allocator.Allocate(size, NoHints) if err != nil { t.Fatal(err) } - defer dec.Deallocate(ptr, NoHints) + // length + if len(slice) != int(size) { + t.Fatal() + } + // deallocate + defer dec.Deallocate(NoHints) - slice := unsafe.Slice((*byte)(ptr), size) + // read for _, i := range slice { if i != 0 { t.Fatal() } } + // write for i := range slice { slice[i] = uint8(i) } + // read + for i, j := range slice { + if j != uint8(i) { + t.Fatal() + } + } + + // slice + slice = slice[:len(slice)/2] for i, j := range slice { if j != uint8(i) { t.Fatal() diff --git a/pkg/common/malloc/leaks_tracker.go b/pkg/common/malloc/leaks_tracker.go new file mode 100644 index 0000000000000..05f55cdd62695 --- /dev/null +++ b/pkg/common/malloc/leaks_tracker.go @@ -0,0 +1,84 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package malloc + +import ( + "fmt" + "io" + "runtime" + "sync" + "sync/atomic" +) + +type LeaksTracker struct { + infos sync.Map // stacktrace id -> *TrackInfo +} + +type trackInfo struct { + allocate *ShardedCounter[uint64, atomic.Uint64, *atomic.Uint64] + deallocate *ShardedCounter[uint64, atomic.Uint64, *atomic.Uint64] +} + +func newTrackInfo() *trackInfo { + return &trackInfo{ + allocate: NewShardedCounter[uint64, atomic.Uint64](runtime.GOMAXPROCS(0)), + deallocate: NewShardedCounter[uint64, atomic.Uint64](runtime.GOMAXPROCS(0)), + } +} + +func (t *LeaksTracker) allocate(id uint64) { + v, ok := t.infos.Load(id) + if ok { + info := v.(*trackInfo) + info.allocate.Add(1) + return + } + + v, _ = t.infos.LoadOrStore(id, newTrackInfo()) + v.(*trackInfo).allocate.Add(1) +} + +func (t *LeaksTracker) deallocate(id uint64) { + v, ok := t.infos.Load(id) + if ok { + info := v.(*trackInfo) + info.deallocate.Add(1) + return + } + + v, _ = t.infos.LoadOrStore(id, newTrackInfo()) + v.(*trackInfo).deallocate.Add(1) +} + +func (t *LeaksTracker) ReportLeaks(w io.Writer) (leaks bool) { + t.infos.Range(func(k, v any) bool { + stacktraceID := k.(uint64) + info := v.(*trackInfo) + + allocate := info.allocate.Load() + deallocate := info.deallocate.Load() + if allocate > deallocate { + fmt.Fprintf(w, "missing free: %s\n", stackInfo(stacktraceID)) + leaks = true + } else if deallocate > allocate { + fmt.Fprintf(w, "excessive free: %s\n", stackInfo(stacktraceID)) + leaks = true + } + + return true + }) + + return leaks +} diff --git a/pkg/common/malloc/leaks_tracking_allocator.go b/pkg/common/malloc/leaks_tracking_allocator.go new file mode 100644 index 0000000000000..a9f10fecfe133 --- /dev/null +++ b/pkg/common/malloc/leaks_tracking_allocator.go @@ -0,0 +1,61 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package malloc + +type LeaksTrackingAllocator struct { + upstream Allocator + deallocatorPool *ClosureDeallocatorPool[leaksTrackingDeallocatorArgs] + tracker *LeaksTracker +} + +type leaksTrackingDeallocatorArgs struct { + stacktraceID uint64 +} + +func NewLeaksTrackingAllocator( + upstream Allocator, + tracker *LeaksTracker, +) (ret *LeaksTrackingAllocator) { + + ret = &LeaksTrackingAllocator{ + upstream: upstream, + tracker: tracker, + + deallocatorPool: NewClosureDeallocatorPool( + func(hints Hints, args leaksTrackingDeallocatorArgs) { + ret.tracker.deallocate(args.stacktraceID) + }, + ), + } + + return ret +} + +var _ Allocator = new(LeaksTrackingAllocator) + +func (t *LeaksTrackingAllocator) Allocate(size uint64, hints Hints) ([]byte, Deallocator, error) { + slice, dec, err := t.upstream.Allocate(size, hints) + if err != nil { + return nil, nil, err + } + stacktraceID := getStacktraceID(0) + t.tracker.allocate(stacktraceID) + return slice, ChainDeallocator( + dec, + t.deallocatorPool.Get(leaksTrackingDeallocatorArgs{ + stacktraceID: stacktraceID, + }), + ), nil +} diff --git a/pkg/common/malloc/leaks_tracking_allocator_test.go b/pkg/common/malloc/leaks_tracking_allocator_test.go new file mode 100644 index 0000000000000..6cb48b8164b61 --- /dev/null +++ b/pkg/common/malloc/leaks_tracking_allocator_test.go @@ -0,0 +1,85 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package malloc + +import ( + "bytes" + "testing" +) + +func TestLeaksTrackingAllocator(t *testing.T) { + testAllocator(t, func() Allocator { + tracker := new(LeaksTracker) + return NewLeaksTrackingAllocator( + NewClassAllocator(NewFixedSizeMmapAllocator), + tracker, + ) + }) + + t.Run("report", func(t *testing.T) { + tracker := new(LeaksTracker) + allocator := NewLeaksTrackingAllocator( + NewCAllocator(), + tracker, + ) + + _, dec, err := allocator.Allocate(42, NoHints) + if err != nil { + t.Fatal(err) + } + + buf := new(bytes.Buffer) + leaks := tracker.ReportLeaks(buf) + if !leaks { + t.Fatal() + } + if len(buf.Bytes()) == 0 { + t.Fatal() + } + + dec.Deallocate(NoHints) + buf = new(bytes.Buffer) + leaks = tracker.ReportLeaks(buf) + if leaks { + t.Fatal() + } + if len(buf.Bytes()) != 0 { + t.Fatal() + } + + }) +} + +func BenchmarkLeaksTrackingAllocator(b *testing.B) { + for _, n := range benchNs { + benchmarkAllocator(b, func() Allocator { + tracker := new(LeaksTracker) + return NewLeaksTrackingAllocator( + NewClassAllocator(NewFixedSizeMmapAllocator), + tracker, + ) + }, n) + } +} + +func FuzzLeaksTrackingAllocator(f *testing.F) { + fuzzAllocator(f, func() Allocator { + tracker := new(LeaksTracker) + return NewLeaksTrackingAllocator( + NewClassAllocator(NewFixedSizeMmapAllocator), + tracker, + ) + }) +} diff --git a/pkg/common/malloc/metrics_allocator.go b/pkg/common/malloc/metrics_allocator.go index 47f70d6629ced..7ea7b475d132f 100644 --- a/pkg/common/malloc/metrics_allocator.go +++ b/pkg/common/malloc/metrics_allocator.go @@ -15,32 +15,27 @@ package malloc import ( - "sync" - "unsafe" - metric "github.com/matrixorigin/matrixone/pkg/util/metric/v2" ) type MetricsAllocator struct { - upstream Allocator - funcPool sync.Pool + upstream Allocator + deallocatorPool *ClosureDeallocatorPool[metricsDeallocatorArgs] +} + +type metricsDeallocatorArgs struct { + size uint64 } func NewMetricsAllocator(upstream Allocator) *MetricsAllocator { - ret := &MetricsAllocator{ + return &MetricsAllocator{ upstream: upstream, + deallocatorPool: NewClosureDeallocatorPool( + func(hints Hints, args metricsDeallocatorArgs) { + metric.MallocCounterFreeBytes.Add(float64(args.size)) + }, + ), } - ret.funcPool = sync.Pool{ - New: func() any { - argumented := new(argumentedFuncDeallocator[uint64]) - argumented.fn = func(_ unsafe.Pointer, hints Hints, size uint64) { - metric.MallocCounterFreeBytes.Add(float64(size)) - ret.funcPool.Put(argumented) - } - return argumented - }, - } - return ret } type AllocateInfo struct { @@ -50,13 +45,17 @@ type AllocateInfo struct { var _ Allocator = new(MetricsAllocator) -func (m *MetricsAllocator) Allocate(size uint64, hints Hints) (unsafe.Pointer, Deallocator, error) { +func (m *MetricsAllocator) Allocate(size uint64, hints Hints) ([]byte, Deallocator, error) { ptr, dec, err := m.upstream.Allocate(size, hints) if err != nil { return nil, nil, err } metric.MallocCounterAllocateBytes.Add(float64(size)) - fn := m.funcPool.Get().(*argumentedFuncDeallocator[uint64]) - fn.SetArgument(size) - return ptr, ChainDeallocator(dec, fn), nil + + return ptr, ChainDeallocator( + dec, + m.deallocatorPool.Get(metricsDeallocatorArgs{ + size: size, + }), + ), nil } diff --git a/pkg/common/malloc/profile_allocator.go b/pkg/common/malloc/profile_allocator.go index 0219a55f0bcf1..5e73fc461f9c0 100644 --- a/pkg/common/malloc/profile_allocator.go +++ b/pkg/common/malloc/profile_allocator.go @@ -16,9 +16,7 @@ package malloc import ( "runtime" - "sync" "sync/atomic" - "unsafe" "github.com/google/pprof/profile" ) @@ -74,10 +72,10 @@ func (h *HeapSampleValues) Values() []int64 { } type ProfileAllocator struct { - upstream Allocator - profiler *Profiler[HeapSampleValues, *HeapSampleValues] - fraction uint32 - funcPool sync.Pool + upstream Allocator + profiler *Profiler[HeapSampleValues, *HeapSampleValues] + fraction uint32 + deallocatorPool *ClosureDeallocatorPool[profileDeallocateArgs] } func NewProfileAllocator( @@ -85,25 +83,18 @@ func NewProfileAllocator( profiler *Profiler[HeapSampleValues, *HeapSampleValues], fraction uint32, ) *ProfileAllocator { - ret := &ProfileAllocator{ + return &ProfileAllocator{ upstream: upstream, profiler: profiler, fraction: fraction, - } - ret.funcPool = sync.Pool{ - New: func() any { - argumented := new(argumentedFuncDeallocator[profileDeallocateArgs]) - argumented.fn = func(_ unsafe.Pointer, hints Hints, args profileDeallocateArgs) { + deallocatorPool: NewClosureDeallocatorPool( + func(hints Hints, args profileDeallocateArgs) { args.values.InuseBytes.Add(int64(-args.size)) args.values.InuseObjects.Add(-1) - ret.funcPool.Put(argumented) - } - return argumented - }, + }, + ), } - - return ret } type profileDeallocateArgs struct { @@ -113,7 +104,7 @@ type profileDeallocateArgs struct { var _ Allocator = new(ProfileAllocator) -func (p *ProfileAllocator) Allocate(size uint64, hints Hints) (unsafe.Pointer, Deallocator, error) { +func (p *ProfileAllocator) Allocate(size uint64, hints Hints) ([]byte, Deallocator, error) { ptr, dec, err := p.upstream.Allocate(size, hints) if err != nil { return nil, nil, err @@ -124,10 +115,11 @@ func (p *ProfileAllocator) Allocate(size uint64, hints Hints) (unsafe.Pointer, D values.AllocatedObjects.Add(1) values.InuseBytes.Add(int64(size)) values.InuseObjects.Add(int64(1)) - fn := p.funcPool.Get().(*argumentedFuncDeallocator[profileDeallocateArgs]) - fn.SetArgument(profileDeallocateArgs{ - values: values, - size: size, - }) - return ptr, ChainDeallocator(dec, fn), nil + return ptr, ChainDeallocator( + dec, + p.deallocatorPool.Get(profileDeallocateArgs{ + values: values, + size: size, + }), + ), nil } diff --git a/pkg/common/malloc/sharded_allocator.go b/pkg/common/malloc/sharded_allocator.go index 5e0cd43d013b5..45d0f5a78d2cd 100644 --- a/pkg/common/malloc/sharded_allocator.go +++ b/pkg/common/malloc/sharded_allocator.go @@ -14,8 +14,6 @@ package malloc -import "unsafe" - type ShardedAllocator []Allocator func NewShardedAllocator(numShards int, newShard func() Allocator) ShardedAllocator { @@ -28,7 +26,7 @@ func NewShardedAllocator(numShards int, newShard func() Allocator) ShardedAlloca var _ Allocator = ShardedAllocator{} -func (s ShardedAllocator) Allocate(size uint64, hints Hints) (unsafe.Pointer, Deallocator, error) { +func (s ShardedAllocator) Allocate(size uint64, hints Hints) ([]byte, Deallocator, error) { pid := runtime_procPin() runtime_procUnpin() return s[pid%len(s)].Allocate(size, hints) diff --git a/pkg/common/malloc/sharded_counter.go b/pkg/common/malloc/sharded_counter.go index 2b369ab97c3ed..5122cd974eb6d 100644 --- a/pkg/common/malloc/sharded_counter.go +++ b/pkg/common/malloc/sharded_counter.go @@ -14,6 +14,8 @@ package malloc +import "golang.org/x/sys/cpu" + type AtomicInteger[T any] interface { Add(T) T Load() T @@ -27,7 +29,12 @@ type ShardedCounter[T CounterInteger, A any, P interface { *A AtomicInteger[T] }] struct { - shards []A + shards []shardedCounterShard[A] +} + +type shardedCounterShard[T any] struct { + value T + _ cpu.CacheLinePad } func NewShardedCounter[T CounterInteger, A any, P interface { @@ -35,7 +42,7 @@ func NewShardedCounter[T CounterInteger, A any, P interface { AtomicInteger[T] }](shards int) *ShardedCounter[T, A, P] { return &ShardedCounter[T, A, P]{ - shards: make([]A, shards), + shards: make([]shardedCounterShard[A], shards), } } @@ -43,12 +50,12 @@ func (s *ShardedCounter[T, A, P]) Add(v T) { pid := runtime_procPin() runtime_procUnpin() shard := pid % len(s.shards) - P(&s.shards[shard]).Add(v) + P(&s.shards[shard].value).Add(v) } func (s *ShardedCounter[T, A, P]) Load() (ret T) { for i := 0; i < len(s.shards); i++ { - ret += P(&s.shards[i]).Load() + ret += P(&s.shards[i].value).Load() } return ret } diff --git a/pkg/common/malloc/size_bounded_allocator.go b/pkg/common/malloc/size_bounded_allocator.go index 349ab56a771f7..69e8e0c8eb2fa 100644 --- a/pkg/common/malloc/size_bounded_allocator.go +++ b/pkg/common/malloc/size_bounded_allocator.go @@ -15,40 +15,37 @@ package malloc import ( - "sync" "sync/atomic" - "unsafe" "github.com/matrixorigin/matrixone/pkg/common/moerr" ) type SizeBoundedAllocator struct { - upstream Allocator - max uint64 - counter *atomic.Uint64 - funcPool sync.Pool + upstream Allocator + max uint64 + counter *atomic.Uint64 + deallocatorPool *ClosureDeallocatorPool[sizeBoundedDeallocatorArgs] } -func NewSizeBoundedAllocator(upstream Allocator, maxSize uint64, counter *atomic.Uint64) *SizeBoundedAllocator { +type sizeBoundedDeallocatorArgs struct { + size uint64 +} + +func NewSizeBoundedAllocator(upstream Allocator, maxSize uint64, counter *atomic.Uint64) (ret *SizeBoundedAllocator) { if counter == nil { counter = new(atomic.Uint64) } - ret := &SizeBoundedAllocator{ + ret = &SizeBoundedAllocator{ max: maxSize, upstream: upstream, counter: counter, - } - ret.funcPool = sync.Pool{ - New: func() any { - argumented := new(argumentedFuncDeallocator[uint64]) - argumented.fn = func(_ unsafe.Pointer, hints Hints, size uint64) { - ret.counter.Add(-size) - ret.funcPool.Put(argumented) - } - return argumented - }, + deallocatorPool: NewClosureDeallocatorPool( + func(hints Hints, args sizeBoundedDeallocatorArgs) { + ret.counter.Add(-args.size) + }, + ), } return ret @@ -56,7 +53,7 @@ func NewSizeBoundedAllocator(upstream Allocator, maxSize uint64, counter *atomic var _ Allocator = new(SizeBoundedAllocator) -func (s *SizeBoundedAllocator) Allocate(size uint64, hints Hints) (unsafe.Pointer, Deallocator, error) { +func (s *SizeBoundedAllocator) Allocate(size uint64, hints Hints) ([]byte, Deallocator, error) { for { cur := s.counter.Load() @@ -77,9 +74,12 @@ func (s *SizeBoundedAllocator) Allocate(size uint64, hints Hints) (unsafe.Pointe return nil, nil, err } - fn := s.funcPool.Get().(*argumentedFuncDeallocator[uint64]) - fn.SetArgument(size) - return ptr, ChainDeallocator(dec, fn), nil + return ptr, ChainDeallocator( + dec, + s.deallocatorPool.Get(sizeBoundedDeallocatorArgs{ + size: size, + }), + ), nil } } diff --git a/pkg/common/malloc/size_bounded_allocator_test.go b/pkg/common/malloc/size_bounded_allocator_test.go index 7d690e7f06eab..cad11c4ede36f 100644 --- a/pkg/common/malloc/size_bounded_allocator_test.go +++ b/pkg/common/malloc/size_bounded_allocator_test.go @@ -35,13 +35,13 @@ func TestSizeBoundedAllocator(t *testing.T) { 24, nil, ) - p1, d1, err := allocator.Allocate(12, NoHints) + _, d1, err := allocator.Allocate(12, NoHints) assert.Nil(t, err) _, _, err = allocator.Allocate(12, NoHints) assert.Nil(t, err) _, _, err = allocator.Allocate(12, NoHints) assert.ErrorContains(t, err, "out of space") - d1.Deallocate(p1, NoHints) + d1.Deallocate(NoHints) _, _, err = allocator.Allocate(12, NoHints) assert.Nil(t, err) }) diff --git a/pkg/fileservice/bytes.go b/pkg/fileservice/bytes.go index dbabf2cbf8c94..f52b03ae287b3 100644 --- a/pkg/fileservice/bytes.go +++ b/pkg/fileservice/bytes.go @@ -15,8 +15,6 @@ package fileservice import ( - "unsafe" - "github.com/matrixorigin/matrixone/pkg/common/malloc" "github.com/matrixorigin/matrixone/pkg/fileservice/memorycache" metric "github.com/matrixorigin/matrixone/pkg/util/metric/v2" @@ -24,7 +22,6 @@ import ( type Bytes struct { bytes []byte - ptr unsafe.Pointer deallocator malloc.Deallocator } @@ -43,7 +40,7 @@ func (b Bytes) Slice(length int) memorycache.CacheData { func (b Bytes) Release() { if b.deallocator != nil { - b.deallocator.Deallocate(b.ptr, malloc.NoHints) + b.deallocator.Deallocate(malloc.NoHints) metric.FSMallocLiveObjectsBytes.Dec() } } @@ -55,14 +52,13 @@ type bytesAllocator struct { var _ CacheDataAllocator = new(bytesAllocator) func (b *bytesAllocator) Alloc(size int) memorycache.CacheData { - ptr, dec, err := b.allocator.Allocate(uint64(size), malloc.NoHints) + slice, dec, err := b.allocator.Allocate(uint64(size), malloc.NoHints) if err != nil { panic(err) } metric.FSMallocLiveObjectsBytes.Inc() return Bytes{ - bytes: unsafe.Slice((*byte)(ptr), size), - ptr: ptr, + bytes: slice, deallocator: dec, } } diff --git a/pkg/fileservice/io_entry.go b/pkg/fileservice/io_entry.go index e00f3e4643ba8..86208b5a5113e 100644 --- a/pkg/fileservice/io_entry.go +++ b/pkg/fileservice/io_entry.go @@ -20,7 +20,6 @@ import ( "io" "os" "time" - "unsafe" "github.com/matrixorigin/matrixone/pkg/common/malloc" "github.com/matrixorigin/matrixone/pkg/fileservice/memorycache" @@ -96,22 +95,22 @@ func CacheOriginalData(r io.Reader, data []byte, allocator CacheDataAllocator) ( func (i *IOEntry) prepareData() (finally func(err *error)) { if cap(i.Data) < int(i.Size) { - ptr, dec, err := getMallocAllocator().Allocate(uint64(i.Size), malloc.NoHints) + slice, dec, err := getMallocAllocator().Allocate(uint64(i.Size), malloc.NoHints) if err != nil { panic(err) } metric.FSMallocLiveObjectsIOEntryData.Inc() - i.Data = unsafe.Slice((*byte)(ptr), i.Size) + i.Data = slice if i.releaseData != nil { i.releaseData() } i.releaseData = func() { - dec.Deallocate(ptr, malloc.NoHints) + dec.Deallocate(malloc.NoHints) metric.FSMallocLiveObjectsIOEntryData.Dec() } finally = func(err *error) { if err != nil && *err != nil { - dec.Deallocate(ptr, malloc.NoHints) + dec.Deallocate(malloc.NoHints) metric.FSMallocLiveObjectsIOEntryData.Dec() } } diff --git a/pkg/fileservice/memorycache/data.go b/pkg/fileservice/memorycache/data.go index 47fb6d97358a6..01d71822d72af 100644 --- a/pkg/fileservice/memorycache/data.go +++ b/pkg/fileservice/memorycache/data.go @@ -16,7 +16,6 @@ package memorycache import ( "sync/atomic" - "unsafe" "github.com/matrixorigin/matrixone/pkg/common/malloc" metric "github.com/matrixorigin/matrixone/pkg/util/metric/v2" @@ -29,7 +28,6 @@ type Data struct { // reference counta for the Data, the Data is free // when the reference count is 0 ref refcnt - ptr unsafe.Pointer deallocator malloc.Deallocator globalSize *atomic.Int64 } @@ -50,12 +48,11 @@ func newData( globalSize: globalSize, } var err error - data.ptr, data.deallocator, err = allocator.Allocate(uint64(size), malloc.NoHints) + data.buf, data.deallocator, err = allocator.Allocate(uint64(size), malloc.NoHints) if err != nil { panic(err) } metric.FSMallocLiveObjectsMemoryCache.Inc() - data.buf = unsafe.Slice((*byte)(data.ptr), size) data.ref.init(1) return data } @@ -63,7 +60,7 @@ func newData( func (d *Data) free() { d.globalSize.Add(-int64(d.size)) d.buf = nil - d.deallocator.Deallocate(d.ptr, malloc.NoHints) + d.deallocator.Deallocate(malloc.NoHints) metric.FSMallocLiveObjectsMemoryCache.Dec() }