Skip to content

Commit

Permalink
malloc: optimizations
Browse files Browse the repository at this point in the history
malloc: return []byte instead of unsafe.Pointer in Allocator.Allocate

malloc: remove pointer param from Deallocator.Deallocate

malloc: rename argumentedFuncDeallocator to closureDeallocator; add closureDeallocatorPool

malloc: expose closure allocator and pool

malloc: add LeaksTrackingAllocator
  • Loading branch information
reusee committed Jun 26, 2024
1 parent dae1706 commit 56a10eb
Show file tree
Hide file tree
Showing 25 changed files with 551 additions and 241 deletions.
6 changes: 2 additions & 4 deletions pkg/common/malloc/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@

package malloc

import "unsafe"

type Allocator interface {
Allocate(size uint64, hint Hints) (unsafe.Pointer, Deallocator, error)
Allocate(size uint64, hint Hints) ([]byte, Deallocator, error)
}

type Deallocator interface {
Deallocate(ptr unsafe.Pointer, hint Hints)
Deallocate(hint Hints)
}
8 changes: 4 additions & 4 deletions pkg/common/malloc/allocator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ func benchmarkAllocator(
allcator := newAllocator()
b.ResetTimer()
for i := 0; i < b.N; i++ {
ptr, dec, err := allcator.Allocate(n, NoHints)
_, dec, err := allcator.Allocate(n, NoHints)
if err != nil {
b.Fatal(err)
}
dec.Deallocate(ptr, NoHints)
dec.Deallocate(NoHints)
}
})

Expand All @@ -46,11 +46,11 @@ func benchmarkAllocator(
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ptr, dec, err := allcator.Allocate(n, NoHints)
_, dec, err := allcator.Allocate(n, NoHints)
if err != nil {
b.Fatal(err)
}
dec.Deallocate(ptr, NoHints)
dec.Deallocate(NoHints)
}
})
})
Expand Down
27 changes: 23 additions & 4 deletions pkg/common/malloc/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package malloc
import (
"math"
"testing"
"unsafe"
)

func testAllocator(
Expand All @@ -29,20 +28,40 @@ func testAllocator(
t.Run("allocate", func(t *testing.T) {
allocator := newAllocator()
for i := uint64(1); i < 128*MB; i = uint64(math.Ceil(float64(i) * 1.1)) {
ptr, dec, err := allocator.Allocate(i, NoHints)
// allocate
slice, dec, err := allocator.Allocate(i, NoHints)
if err != nil {
t.Fatal(err)
}
slice := unsafe.Slice((*byte)(ptr), i)
// len
if len(slice) != int(i) {
t.Fatal()
}
// read
for _, i := range slice {
if i != 0 {
t.Fatal("not zeroed")
}
}
// write
for i := range slice {
slice[i] = byte(i)
}
dec.Deallocate(ptr, NoHints)
// read
for i := range slice {
if slice[i] != byte(i) {
t.Fatal()
}
}
// slice
slice = slice[:len(slice)/2]
for i := range slice {
if slice[i] != byte(i) {
t.Fatal()
}
}
// deallocate
dec.Deallocate(NoHints)
}
})

Expand Down
26 changes: 17 additions & 9 deletions pkg/common/malloc/c_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,32 @@ import "unsafe"
import "C"

type CAllocator struct {
deallocatorPool *ClosureDeallocatorPool[cDeallocatorArgs]
}

type cDeallocatorArgs struct {
ptr unsafe.Pointer
}

func NewCAllocator() *CAllocator {
return &CAllocator{}
return &CAllocator{
deallocatorPool: NewClosureDeallocatorPool(
func(hints Hints, args cDeallocatorArgs) {
C.free(args.ptr)
},
),
}
}

var _ Allocator = new(CAllocator)

func (c *CAllocator) Allocate(size uint64, hints Hints) (unsafe.Pointer, Deallocator, error) {
func (c *CAllocator) Allocate(size uint64, hints Hints) ([]byte, Deallocator, error) {
ptr := C.malloc(C.ulong(size))
if hints&NoClear == 0 {
clear(unsafe.Slice((*byte)(ptr), size))
}
return ptr, c, nil
}

var _ Deallocator = new(CAllocator)

func (c *CAllocator) Deallocate(ptr unsafe.Pointer, hints Hints) {
C.free(ptr)
slice := unsafe.Slice((*byte)(ptr), size)
return slice, c.deallocatorPool.Get(cDeallocatorArgs{
ptr: ptr,
}), nil
}
5 changes: 2 additions & 3 deletions pkg/common/malloc/chain_deallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ package malloc

import (
"sync"
"unsafe"
)

type chainDeallocator []Deallocator

var _ Deallocator = &chainDeallocator{}

func (c *chainDeallocator) Deallocate(ptr unsafe.Pointer, hints Hints) {
func (c *chainDeallocator) Deallocate(hints Hints) {
for i := len(*c) - 1; i >= 0; i-- {
(*c)[i].Deallocate(ptr, hints)
(*c)[i].Deallocate(hints)
}
*c = (*c)[:0]
chainDeallocatorPool.Put(c)
Expand Down
37 changes: 16 additions & 21 deletions pkg/common/malloc/checked_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,59 +17,53 @@ package malloc
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"unsafe"
)

type CheckedAllocator struct {
upstream Allocator
fraction uint32
funcPool sync.Pool
upstream Allocator
fraction uint32
deallocatorPool *ClosureDeallocatorPool[checkedAllocatorArgs]
}

type checkedAllocatorArgs struct {
deallocated *atomic.Bool
deallocator Deallocator
stackID uint64
size uint64
ptr unsafe.Pointer
}

func NewCheckedAllocator(upstream Allocator, fraction uint32) *CheckedAllocator {
ret := &CheckedAllocator{
return &CheckedAllocator{
upstream: upstream,
fraction: fraction,
}

ret.funcPool = sync.Pool{
New: func() any {
argumented := new(argumentedFuncDeallocator[checkedAllocatorArgs])
argumented.fn = func(ptr unsafe.Pointer, hints Hints, args checkedAllocatorArgs) {
deallocatorPool: NewClosureDeallocatorPool(
func(hints Hints, args checkedAllocatorArgs) {

if !args.deallocated.CompareAndSwap(false, true) {
panic(fmt.Sprintf(
"double free: address %p, size %v, allocated at %s",
ptr,
args.ptr,
args.size,
stackInfo(args.stackID),
))
}

hints |= DoNotReuse
args.deallocator.Deallocate(ptr, hints)
args.deallocator.Deallocate(hints)

ret.funcPool.Put(argumented)
}
return argumented
},
},
),
}

return ret
}

var _ Allocator = new(CheckedAllocator)

func (c *CheckedAllocator) Allocate(size uint64, hints Hints) (unsafe.Pointer, Deallocator, error) {
func (c *CheckedAllocator) Allocate(size uint64, hints Hints) ([]byte, Deallocator, error) {
ptr, dec, err := c.upstream.Allocate(size, hints)
if err != nil {
return nil, nil, err
Expand All @@ -92,12 +86,13 @@ func (c *CheckedAllocator) Allocate(size uint64, hints Hints) (unsafe.Pointer, D
}
})

fn := c.funcPool.Get().(*argumentedFuncDeallocator[checkedAllocatorArgs])
fn.SetArgument(checkedAllocatorArgs{
dec = c.deallocatorPool.Get(checkedAllocatorArgs{
deallocated: deallocated,
deallocator: dec,
stackID: stackID,
size: size,
ptr: unsafe.Pointer(unsafe.SliceData(ptr)),
})
return ptr, fn, nil

return ptr, dec, nil
}
14 changes: 6 additions & 8 deletions pkg/common/malloc/checked_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"runtime"
"strings"
"testing"
"unsafe"
)

func TestCheckedAllocator(t *testing.T) {
Expand All @@ -42,7 +41,7 @@ func TestCheckedAllocator(t *testing.T) {
}
// comment the following line to trigger a missing-free panic
// this panic will be raised in SetFinalizer func so it's not recoverable and not testable
dec.Deallocate(ptr, NoHints)
dec.Deallocate(NoHints)
_ = ptr
_ = dec
runtime.GC()
Expand All @@ -64,12 +63,12 @@ func TestCheckedAllocator(t *testing.T) {
NewClassAllocator(NewFixedSizeMmapAllocator),
1,
)
ptr, dec, err := allocator.Allocate(42, NoHints)
_, dec, err := allocator.Allocate(42, NoHints)
if err != nil {
t.Fatal(err)
}
dec.Deallocate(ptr, NoHints)
dec.Deallocate(ptr, NoHints)
dec.Deallocate(NoHints)
dec.Deallocate(NoHints)
})

// use after free
Expand All @@ -78,15 +77,14 @@ func TestCheckedAllocator(t *testing.T) {
NewClassAllocator(NewFixedSizeMmapAllocator),
1,
)
ptr, dec, err := allocator.Allocate(42, NoHints)
slice, dec, err := allocator.Allocate(42, NoHints)
if err != nil {
t.Fatal(err)
}
slice := unsafe.Slice((*byte)(ptr), 42)
for i := range slice {
slice[i] = byte(i)
}
dec.Deallocate(ptr, NoHints)
dec.Deallocate(NoHints)
// zero or segfault
//for i := range slice {
// if slice[i] != 0 {
Expand Down
10 changes: 7 additions & 3 deletions pkg/common/malloc/class_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package malloc

import (
"math/bits"
"unsafe"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
)
Expand Down Expand Up @@ -52,7 +51,7 @@ func NewClassAllocator[T FixedSizeAllocator](

var _ Allocator = new(ClassAllocator[*fixedSizeMmapAllocator])

func (c *ClassAllocator[T]) Allocate(size uint64, hints Hints) (unsafe.Pointer, Deallocator, error) {
func (c *ClassAllocator[T]) Allocate(size uint64, hints Hints) ([]byte, Deallocator, error) {
if size == 0 {
return nil, nil, moerr.NewInternalErrorNoCtx("invalid allocate size: 0")
}
Expand All @@ -68,5 +67,10 @@ func (c *ClassAllocator[T]) Allocate(size uint64, hints Hints) (unsafe.Pointer,
if i >= len(c.classes) {
return nil, nil, moerr.NewInternalErrorNoCtx("cannot allocate %v bytes: too large", size)
}
return c.classes[i].allocator.Allocate(hints)
slice, dec, err := c.classes[i].allocator.Allocate(hints)
if err != nil {
return nil, nil, err
}
slice = slice[:size]
return slice, dec, nil
}
61 changes: 61 additions & 0 deletions pkg/common/malloc/closure_deallocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package malloc

import (
"sync"
)

type ClosureDeallocator[T any] struct {
argument T
fn func(Hints, T)
}

func (a *ClosureDeallocator[T]) SetArgument(arg T) {
a.argument = arg
}

var _ Deallocator = &ClosureDeallocator[int]{}

func (a *ClosureDeallocator[T]) Deallocate(hints Hints) {
a.fn(hints, a.argument)
}

type ClosureDeallocatorPool[T any] struct {
pool sync.Pool
}

func NewClosureDeallocatorPool[T any](
deallocateFunc func(Hints, T),
) *ClosureDeallocatorPool[T] {
ret := new(ClosureDeallocatorPool[T])

ret.pool.New = func() any {
closure := new(ClosureDeallocator[T])
closure.fn = func(hints Hints, args T) {
deallocateFunc(hints, args)
ret.pool.Put(closure)
}
return closure
}

return ret
}

func (c *ClosureDeallocatorPool[T]) Get(args T) Deallocator {
closure := c.pool.Get().(*ClosureDeallocator[T])
closure.SetArgument(args)
return closure
}
4 changes: 1 addition & 3 deletions pkg/common/malloc/fixed_size_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

package malloc

import "unsafe"

type FixedSizeAllocator interface {
Allocate(hint Hints) (unsafe.Pointer, Deallocator, error)
Allocate(hint Hints) ([]byte, Deallocator, error)
}
Loading

0 comments on commit 56a10eb

Please sign in to comment.