diff --git a/.github/dco.yml b/.github/dco.yml new file mode 100644 index 0000000..2ad867c --- /dev/null +++ b/.github/dco.yml @@ -0,0 +1,4 @@ +allowRemediationCommits: + individual: true +require: + members: false \ No newline at end of file diff --git a/go.mod b/go.mod index bf15ef7..3280e9e 100644 --- a/go.mod +++ b/go.mod @@ -2,27 +2,12 @@ module github.com/loopholelabs/common go 1.18 -require ( - github.com/google/uuid v1.6.0 - github.com/minio/minio-go/v7 v7.0.76 - github.com/stretchr/testify v1.9.0 -) +require github.com/stretchr/testify v1.9.0 require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/dustin/go-humanize v1.0.1 // indirect - github.com/go-ini/ini v1.67.0 // indirect - github.com/goccy/go-json v0.10.3 // indirect - github.com/klauspost/compress v1.17.9 // indirect - github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/kr/pretty v0.3.1 // indirect - github.com/minio/md5-simd v1.1.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rs/xid v1.6.0 // indirect - golang.org/x/crypto v0.27.0 // indirect - golang.org/x/net v0.29.0 // indirect - golang.org/x/sys v0.25.0 // indirect - golang.org/x/text v0.18.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 0868957..c18bab0 100644 --- a/go.sum +++ b/go.sum @@ -1,19 +1,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= -github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= -github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= -github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= -github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= -github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= -github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= @@ -21,28 +8,13 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= -github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= -github.com/minio/minio-go/v7 v7.0.76 h1:9nxHH2XDai61cT/EFhyIw/wW4vJfpPNvl7lSFpRt+Ng= -github.com/minio/minio-go/v7 v7.0.76/go.mod h1:AVM3IUN6WwKzmwBxVdjzhH8xq+f57JSbbvzqvUzR6eg= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= -github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= -golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= -golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= -golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= -golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go deleted file mode 100644 index c5f342d..0000000 --- a/pkg/chunk/chunk.go +++ /dev/null @@ -1,100 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package chunk - -import ( - "context" - "io" - "sync" - - "github.com/minio/minio-go/v7" - - "github.com/loopholelabs/common/pkg/pool" -) - -var ( - chunkPool = pool.NewPool[Chunk, *Chunk](func() *Chunk { - return new(Chunk) - }) -) - -// Chunk manages downloading a single chunk of data from a remote server -type Chunk struct { - // client is the S3 client to use for downloading the chunk - client *minio.Client - - // ctx is the context to use for the download - ctx context.Context - - // bucket is the S3 bucket to download the chunk from - bucket string - - // key is the S3 key to download the chunk from - key string - - // opts are the options to use for the download - opts *minio.GetObjectOptions - - // res is the S3 response from the download - obj *minio.Object - - // data is the data downloaded from the remote server - data []byte - - // err is the error that occurred while downloading the chunk - err error - - // wg is the wait group used to wait for the chunk to finish downloading - wg *sync.WaitGroup -} - -func GetChunk(client *minio.Client, ctx context.Context, offset int64, size int64, bucket string, key string) (*Chunk, error) { - c := chunkPool.Get() - c.client = client - c.ctx = ctx - c.bucket = bucket - c.key = key - - c.opts = new(minio.GetObjectOptions) - err := c.opts.SetRange(offset, offset+size-1) - if err != nil { - return nil, err - } - - c.wg = new(sync.WaitGroup) - c.wg.Add(1) - go c.do() - return c, nil -} - -func ReturnChunk(c *Chunk) { - chunkPool.Put(c) -} - -func (c *Chunk) do() { - c.obj, c.err = c.client.GetObject(c.ctx, c.bucket, c.key, *c.opts) - if c.err == nil { - c.data, c.err = io.ReadAll(c.obj) - _ = c.obj.Close() - } - c.wg.Done() -} - -func (c *Chunk) Wait() ([]byte, error) { - c.wg.Wait() - return c.data, c.err -} - -func (c *Chunk) Reset() { - c.client = nil - c.ctx = nil - c.opts = nil - c.obj = nil - c.data = nil - c.err = nil - c.wg = nil -} - -func (c *Chunk) Return() { - ReturnChunk(c) -} diff --git a/pkg/chunk/chunk_test.go b/pkg/chunk/chunk_test.go deleted file mode 100644 index 66b94e9..0000000 --- a/pkg/chunk/chunk_test.go +++ /dev/null @@ -1,133 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package chunk - -import ( - "bytes" - "context" - "crypto/rand" - "sync" - "testing" - - "github.com/google/uuid" - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" - "github.com/stretchr/testify/require" -) - -func testClient(t *testing.T) (*minio.Client, []byte, string, string) { - client, err := minio.New("play.min.io", &minio.Options{ - Creds: credentials.NewStaticV4("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG", ""), - Secure: true, - }) - require.NoError(t, err) - - bucketName := uuid.New().String() - err = client.MakeBucket(context.Background(), bucketName, minio.MakeBucketOptions{ - Region: "us-east-1", - }) - require.NoError(t, err) - - objectName := uuid.New().String() - objectContent := make([]byte, 1024*1024) - _, err = rand.Read(objectContent) - require.NoError(t, err) - - _, err = client.PutObject(context.Background(), bucketName, objectName, bytes.NewReader(objectContent), int64(len(objectContent)), minio.PutObjectOptions{ - ContentType: "application/octet-stream", - }) - require.NoError(t, err) - - t.Cleanup(func() { - err = client.RemoveObject(context.Background(), bucketName, objectName, minio.RemoveObjectOptions{}) - require.NoError(t, err) - err = client.RemoveBucket(context.Background(), bucketName) - require.NoError(t, err) - }) - - return client, objectContent, bucketName, objectName -} - -func TestChunk(t *testing.T) { - client, data, bucket, obj := testClient(t) - var offset int64 = 0 - const chunkSize = 512 - - chunk, err := GetChunk(client, context.Background(), offset, chunkSize, bucket, obj) - require.NoError(t, err) - - downloadedData, err := chunk.Wait() - require.NoError(t, err) - require.Equal(t, chunkSize, len(downloadedData)) - require.Equal(t, data[:chunkSize], downloadedData) - - chunk.Return() - - offset += chunkSize * 2 - chunk, err = GetChunk(client, context.Background(), offset, chunkSize, bucket, obj) - require.NoError(t, err) - - downloadedData, err = chunk.Wait() - require.NoError(t, err) - require.Equal(t, chunkSize, len(downloadedData)) - require.Equal(t, data[offset:offset+chunkSize], downloadedData) - - chunk.Return() -} - -func TestConcurrentChunk(t *testing.T) { - client, data, bucket, obj := testClient(t) - const offset = 32 - const chunkSize = 512 - - chunk, err := GetChunk(client, context.Background(), offset, chunkSize, bucket, obj) - require.NoError(t, err) - - start := make(chan struct{}) - wg := sync.WaitGroup{} - - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - <-start - downloadedData, err := chunk.Wait() - require.NoError(t, err) - require.Equal(t, chunkSize, len(downloadedData)) - require.Equal(t, data[offset:offset+chunkSize], downloadedData) - wg.Done() - }() - } - - close(start) - wg.Wait() - - chunk.Return() -} - -func TestInvalidChunkOffset(t *testing.T) { - client, data, bucket, obj := testClient(t) - var offset = len(data) + 1 - const chunkSize = 512 - - chunk, err := GetChunk(client, context.Background(), int64(offset), chunkSize, bucket, obj) - require.NoError(t, err) - - _, err = chunk.Wait() - require.Error(t, err) - chunk.Return() -} - -func TestInvalidChunkSize(t *testing.T) { - client, data, bucket, obj := testClient(t) - const offset = 512 - var chunkSize = len(data) + 1 - - chunk, err := GetChunk(client, context.Background(), offset, int64(chunkSize), bucket, obj) - require.NoError(t, err) - - downloadedData, err := chunk.Wait() - require.NoError(t, err) - require.Equal(t, len(data)-offset, len(downloadedData)) - require.Equal(t, data[offset:], downloadedData) - chunk.Return() -} diff --git a/pkg/hashlock/hashlock.go b/pkg/hashlock/hashlock.go new file mode 100644 index 0000000..2576de3 --- /dev/null +++ b/pkg/hashlock/hashlock.go @@ -0,0 +1,113 @@ +// SPDX-License-Identifier: Apache-2.0 + +package hashlock + +import ( + "context" + "sync" + "time" +) + +const ( + // DefaultTimeout is the default timeout for locks + DefaultTimeout = time.Second +) + +var ( + // GCTime is the time between garbage collection runs + GCTime = time.Minute +) + +type Lock struct { + ch chan struct{} + mu sync.RWMutex +} + +type HashLock[T comparable] struct { + locks map[T]*Lock + mu sync.Mutex + timeout time.Duration + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +func New[T comparable](d time.Duration) *HashLock[T] { + if d < 0 { + d = DefaultTimeout + } + + ctx, cancel := context.WithCancel(context.Background()) + h := &HashLock[T]{ + locks: make(map[T]*Lock), + timeout: d, + ctx: ctx, + cancel: cancel, + } + + h.wg.Add(1) + go h.gc() + + return h +} + +func (l *HashLock[T]) Close() { + l.cancel() + l.wg.Wait() +} + +func (l *HashLock[T]) Lock(key T) { + lock := l.get(key) + lock.ch <- struct{}{} + lock.mu.RUnlock() + if l.timeout > 0 { + time.AfterFunc(l.timeout, func() { + l.Unlock(key) + }) + } +} + +func (l *HashLock[T]) Unlock(key T) { + lock := l.get(key) + select { + case <-lock.ch: + default: + } + lock.mu.RUnlock() +} + +// get returns the lock.mu in Rlock mode, so it must be unlocked by the caller - otherwise the +// garbage collector will not be able to clean it up +func (l *HashLock[T]) get(key T) *Lock { + l.mu.Lock() + lock, found := l.locks[key] + if !found { + lock = &Lock{ch: make(chan struct{}, 1)} + l.locks[key] = lock + } + lock.mu.RLock() + l.mu.Unlock() + return lock +} + +func (l *HashLock[T]) gc() { + for { + select { + case <-l.ctx.Done(): + l.wg.Done() + return + case <-time.After(GCTime): + l.mu.Lock() + for k, v := range l.locks { + if v.mu.TryLock() { + if len(v.ch) == 0 { + delete(l.locks, k) + } + v.mu.Unlock() + } + } + l.mu.Unlock() + } + } +} diff --git a/pkg/hashlock/hashlock_test.go b/pkg/hashlock/hashlock_test.go new file mode 100644 index 0000000..6b93973 --- /dev/null +++ b/pkg/hashlock/hashlock_test.go @@ -0,0 +1,94 @@ +// SPDX-License-Identifier: Apache-2.0 + +package hashlock + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +const iterations = 10 + +func TestLockUnlock(t *testing.T) { + h := New[string](0) + t.Cleanup(func() { h.Close() }) + + var wg sync.WaitGroup + for i := 0; i < iterations; i++ { + wg.Add(1) + go func(i int) { + time.Sleep(time.Duration(iterations-i) * time.Millisecond) + h.Lock(t.Name()) + time.Sleep(time.Duration(iterations-i) * time.Millisecond) + h.Unlock(t.Name()) + wg.Done() + }(i) + } + wg.Wait() +} + +func TestDoubleUnlock(t *testing.T) { + h := New[string](0) + t.Cleanup(func() { h.Close() }) + h.Lock(t.Name()) + h.Unlock(t.Name()) + h.Unlock(t.Name()) +} + +func TestUnlockWithoutLock(t *testing.T) { + h := New[string](0) + t.Cleanup(func() { h.Close() }) + h.Unlock(t.Name()) +} + +func TestTimeout(t *testing.T) { + h := New[string](DefaultTimeout) + t.Cleanup(func() { h.Close() }) + startTime := time.Now() + h.Lock(t.Name()) + h.Lock(t.Name()) + endTime := time.Now() + require.GreaterOrEqual(t, endTime.Sub(startTime), DefaultTimeout) +} + +func TestRelock(t *testing.T) { + h := New[string](DefaultTimeout) + t.Cleanup(func() { h.Close() }) + h.Lock(t.Name()) + h.Unlock(t.Name()) + startTime := time.Now() + h.Lock(t.Name()) + endTime := time.Now() + require.Less(t, endTime.Sub(startTime), time.Millisecond) +} + +func (l *HashLock[T]) LockHacked(key T) { + lock := l.get(key) + time.Sleep(3 * time.Second) + lock.mu.RLock() + lock.ch <- struct{}{} + lock.mu.RUnlock() + if l.timeout > 0 { + time.AfterFunc(l.timeout, func() { + l.Unlock(key) + }) + } +} + +func TestDoubleLockWhenGCDuringLock(t *testing.T) { + GCTime = time.Nanosecond + _DefaultTimeout := 5 * time.Second + + h := New[string](_DefaultTimeout) + t.Cleanup(func() { h.Close() }) + + h.LockHacked(t.Name()) + + // Verify the first lock timeout before we're able to lock again. + start := time.Now() + h.Lock(t.Name()) + require.GreaterOrEqual(t, time.Now().Sub(start), DefaultTimeout) +}