Skip to content

Commit

Permalink
Merge pull request #1439 from bnb-chain/develop
Browse files Browse the repository at this point in the history
release: v1.10.0
  • Loading branch information
annielz authored Nov 22, 2024
2 parents bf6fa8d + 1486c34 commit 042fcfb
Show file tree
Hide file tree
Showing 20 changed files with 325 additions and 79 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## v1.10.0
BUGFIXES
* [#1435](https://github.com/bnb-chain/greenfield-storage-provider/pull/1435) fix: gc objects

## v1.9.0
BUGFIXES
* [#1423](https://github.com/bnb-chain/greenfield-storage-provider/pull/1423) fix: no such object error code
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ install-go-test-coverage:
go install github.com/vladopajic/go-test-coverage/v2@latest

install-tools:
go install go.uber.org/mock/mockgen@latest
go install go.uber.org/mock/mockgen@v0.1.0
go install github.com/bufbuild/buf/cmd/[email protected]
go install github.com/cosmos/gogoproto/protoc-gen-gocosmos@latest

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Failure: plugin gocosmos: could not find protoc plugin for name gocosmos - pleas
GO111MODULE=on GOBIN=/usr/local/go/bin go install github.com/cosmos/gogoproto/protoc-gen-gocosmos@latest

# if you want to execute unit test of sp, you should execute the following command, assumed that you installed golang in /usr/local/go/bin. Other OS are similar.
GO111MODULE=on GOBIN=/usr/local/go/bin go install go.uber.org/mock/mockgen@latest
GO111MODULE=on GOBIN=/usr/local/go/bin go install go.uber.org/mock/mockgen@v0.1.0
```

Above error messages are due to users don't set go env correctly. More info users can search `GOROOT`, `GOPATH` and `GOBIN`.
Expand Down
2 changes: 1 addition & 1 deletion cmd/command/bs_data_migration/v1.0.1/fix_payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func FixPayment(endpoint string, db *gorm.DB) error {
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
httpErr = fmt.Errorf(resp.Status)
httpErr = fmt.Errorf("%s", resp.Status)
return
}
err := json.NewDecoder(resp.Body).Decode(&paymentResult)
Expand Down
2 changes: 1 addition & 1 deletion cmd/command/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (w *CMDWrapper) queryTasksAction(ctx *cli.Context) error {
fmt.Printf("failed to query task due to no task, endpoint:%v, key:%v\n", endpoint, key)
}
for _, info := range infos {
fmt.Printf(info + "\n")
fmt.Printf("%s"+"\n", info)
}
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions core/piecestore/piecestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@ type PieceStore interface {
// DeletePiece deletes the piece data from piece store, it can delete
// segment or ec piece data.
DeletePiece(ctx context.Context, key string) error
// DeletePiecesByPrefix deletes pieces data from piece store, it can delete
// segment or ec piece data.
DeletePiecesByPrefix(ctx context.Context, key string) (uint64, error)
}
15 changes: 15 additions & 0 deletions core/piecestore/piecestore_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 25 additions & 29 deletions modular/executor/execute_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package executor
import (
"context"
"errors"
"fmt"
"math"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -199,7 +201,6 @@ func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.G
currentGCBlockID uint64
currentGCObjectID uint64
responseEndBlockID uint64
storageParams *storagetypes.Params
gcObjectNumber int
tryAgainLater bool
taskIsCanceled bool
Expand Down Expand Up @@ -252,30 +253,18 @@ func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.G
}

for _, object := range waitingGCObjects {
if storageParams, err = e.baseApp.Consensus().QueryStorageParamsByTimestamp(
context.Background(), object.GetObjectInfo().GetCreateAt()); err != nil {
log.Errorw("failed to query storage params", "task_info", task.Info(), "error", err)
return
}

currentGCBlockID = uint64(object.GetDeleteAt())
objectInfo := object.GetObjectInfo()
objectVersion := objectInfo.Version
currentGCObjectID = objectInfo.Id.Uint64()
if currentGCBlockID < task.GetCurrentBlockNumber() {
log.Errorw("skip gc object", "object_info", objectInfo,
"task_current_gc_block_id", task.GetCurrentBlockNumber())
continue
}
segmentCount := e.baseApp.PieceOp().SegmentPieceCount(objectInfo.GetPayloadSize(),
storageParams.VersionedParams.GetMaxSegmentSize())
for segIdx := uint32(0); segIdx < segmentCount; segIdx++ {
pieceKey := e.baseApp.PieceOp().SegmentPieceKey(currentGCObjectID, segIdx, objectVersion)
// ignore this delete api error, TODO: refine gc workflow by enrich metadata index.
deleteErr := e.baseApp.PieceStore().DeletePiece(ctx, pieceKey)
log.CtxDebugw(ctx, "delete the primary sp pieces", "object_info", objectInfo,
"piece_key", pieceKey, "error", deleteErr)
}
segmentPieceKeyPrefix := fmt.Sprintf("s%d_", currentGCObjectID)
deletedSize, deleteErr := e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, segmentPieceKeyPrefix)
log.CtxDebugw(ctx, "delete the primary sp pieces", "object_info", objectInfo,
"piece_key_prefix", segmentPieceKeyPrefix, "deletedSize", deletedSize, "error", deleteErr)
bucketInfo, err := e.baseApp.GfSpClient().GetBucketInfoByBucketName(ctx, objectInfo.BucketName)
if err != nil || bucketInfo == nil {
log.Errorw("failed to get bucket by bucket name", "bucket_name", objectInfo.BucketName, "error", err)
Expand All @@ -288,23 +277,30 @@ func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.G
}

var redundancyIndex int32 = -1
for rIdx, sspId := range gvg.GetSecondarySpIds() {
if spId == sspId {
redundancyIndex = int32(rIdx)
for segIdx := uint32(0); segIdx < segmentCount; segIdx++ {
pieceKey := e.baseApp.PieceOp().ECPieceKey(currentGCObjectID, segIdx, uint32(rIdx), objectVersion)
if objectInfo.GetRedundancyType() == storagetypes.REDUNDANCY_REPLICA_TYPE {
pieceKey = e.baseApp.PieceOp().SegmentPieceKey(objectInfo.Id.Uint64(), segIdx, objectVersion)
}
// since in GC the object will be completely deleted, simply find all pieces with the piece key prefix and remove them
ECPieceKeyPrefix := fmt.Sprintf("e%d_", currentGCObjectID)
if len(gvg.GetSecondarySpIds()) != 0 {
for rIdx, sspId := range gvg.GetSecondarySpIds() {
if spId == sspId {
redundancyIndex = int32(rIdx)
// ignore this delete api error, TODO: refine gc workflow by enrich metadata index.
deleteErr := e.baseApp.PieceStore().DeletePiece(ctx, pieceKey)
log.CtxDebugw(ctx, "delete the secondary sp pieces",
"object_info", objectInfo, "piece_key", pieceKey, "error", deleteErr)
deletedSize, deleteErr = e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, ECPieceKeyPrefix)
log.CtxDebugw(ctx, "delete the secondary sp pieces by prefix",
"object_info", objectInfo, "piece_key_prefix", ECPieceKeyPrefix, "deletedSize", deletedSize, "error", deleteErr)
}
}
} else {
// if failed to get secondary sps, check the current sp
deletedSize, deleteErr = e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, ECPieceKeyPrefix)
log.CtxDebugw(ctx, "delete the sp pieces by prefix in current sp when secondary sp not found",
"object_info", objectInfo, "piece_key_prefix", ECPieceKeyPrefix, "deletedSize", deletedSize, "error", deleteErr)

// signal as delete any integrity meta related with the object
redundancyIndex = math.MaxInt32
}

// ignore this delete api error, TODO: refine gc workflow by enrich metadata index
deleteErr := e.baseApp.GfSpDB().DeleteObjectIntegrity(objectInfo.Id.Uint64(), redundancyIndex)
deleteErr = e.baseApp.GfSpDB().DeleteObjectIntegrity(objectInfo.Id.Uint64(), redundancyIndex)
log.CtxDebugw(ctx, "delete the object integrity meta", "object_info", objectInfo, "error", deleteErr)
task.SetCurrentBlockNumber(currentGCBlockID)
task.SetLastDeletedObjectId(currentGCObjectID)
Expand Down
45 changes: 3 additions & 42 deletions modular/executor/executor_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,32 +520,6 @@ func TestExecuteModular_HandleGCObjectTask(t *testing.T) {
return e
},
},
{
name: "failed to query storage params",
task: &gfsptask.GfSpGCObjectTask{
Task: &gfsptask.GfSpTask{},
},
fn: func() *ExecuteModular {
e := setup(t)
ctrl := gomock.NewController(t)
m := gfspclient.NewMockGfSpClientAPI(ctrl)
waitingGCObjects := []*metadatatypes.Object{
{
ObjectInfo: &storagetypes.ObjectInfo{Id: sdkmath.NewUint(1)},
},
}
m.EXPECT().ListDeletedObjectsByBlockNumberRange(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).Return(waitingGCObjects, uint64(0), nil).Times(1)
m.EXPECT().ReportTask(gomock.Any(), gomock.Any()).Return(nil).Times(1)
e.baseApp.SetGfSpClient(m)

m1 := consensus.NewMockConsensus(ctrl)
m1.EXPECT().QueryStorageParamsByTimestamp(gomock.Any(), gomock.Any()).Return(nil, mockErr).Times(1)
m1.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{Id: 1}, nil).Times(1)
e.baseApp.SetConsensus(m1)
return e
},
},
{
name: "failed to get bucket by bucket name",
task: &gfsptask.GfSpGCObjectTask{
Expand All @@ -568,18 +542,14 @@ func TestExecuteModular_HandleGCObjectTask(t *testing.T) {
e.baseApp.SetGfSpClient(m)

m1 := consensus.NewMockConsensus(ctrl)
m1.EXPECT().QueryStorageParamsByTimestamp(gomock.Any(), gomock.Any()).Return(&storagetypes.Params{
VersionedParams: storagetypes.VersionedParams{MaxSegmentSize: 10}}, nil).Times(1)
m1.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{Id: 1}, nil).Times(1)
e.baseApp.SetConsensus(m1)

m2 := piecestore.NewMockPieceOp(ctrl)
m2.EXPECT().SegmentPieceCount(gomock.Any(), gomock.Any()).Return(uint32(1)).Times(1)
m2.EXPECT().SegmentPieceKey(gomock.Any(), gomock.Any(), gomock.Any()).Return("test").Times(1)
e.baseApp.SetPieceOp(m2)

m3 := piecestore.NewMockPieceStore(ctrl)
m3.EXPECT().DeletePiece(gomock.Any(), gomock.Any()).Return(nil).Times(1)
m3.EXPECT().DeletePiecesByPrefix(gomock.Any(), gomock.Any()).Return(uint64(0), nil).Times(1)
e.baseApp.SetPieceStore(m3)
return e
},
Expand Down Expand Up @@ -608,18 +578,14 @@ func TestExecuteModular_HandleGCObjectTask(t *testing.T) {
e.baseApp.SetGfSpClient(m)

m1 := consensus.NewMockConsensus(ctrl)
m1.EXPECT().QueryStorageParamsByTimestamp(gomock.Any(), gomock.Any()).Return(&storagetypes.Params{
VersionedParams: storagetypes.VersionedParams{MaxSegmentSize: 10}}, nil).Times(1)
m1.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{Id: 1}, nil).Times(1)
e.baseApp.SetConsensus(m1)

m2 := piecestore.NewMockPieceOp(ctrl)
m2.EXPECT().SegmentPieceCount(gomock.Any(), gomock.Any()).Return(uint32(1)).Times(1)
m2.EXPECT().SegmentPieceKey(gomock.Any(), gomock.Any(), gomock.Any()).Return("test").Times(1)
e.baseApp.SetPieceOp(m2)

m3 := piecestore.NewMockPieceStore(ctrl)
m3.EXPECT().DeletePiece(gomock.Any(), gomock.Any()).Return(nil).Times(1)
m3.EXPECT().DeletePiecesByPrefix(gomock.Any(), gomock.Any()).Return(uint64(0), nil).Times(1)
e.baseApp.SetPieceStore(m3)
return e
},
Expand Down Expand Up @@ -676,19 +642,14 @@ func TestExecuteModular_HandleGCObjectTask(t *testing.T) {
e.baseApp.SetGfSpClient(m)

m1 := consensus.NewMockConsensus(ctrl)
m1.EXPECT().QueryStorageParamsByTimestamp(gomock.Any(), gomock.Any()).Return(&storagetypes.Params{
VersionedParams: storagetypes.VersionedParams{MaxSegmentSize: 10}}, nil).Times(1)
m1.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{Id: 1}, nil).Times(1)
e.baseApp.SetConsensus(m1)

m2 := piecestore.NewMockPieceOp(ctrl)
m2.EXPECT().SegmentPieceCount(gomock.Any(), gomock.Any()).Return(uint32(1)).Times(1)
m2.EXPECT().SegmentPieceKey(gomock.Any(), gomock.Any(), gomock.Any()).Return("test").Times(1)
m2.EXPECT().ECPieceKey(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("test").Times(1)
e.baseApp.SetPieceOp(m2)

m3 := piecestore.NewMockPieceStore(ctrl)
m3.EXPECT().DeletePiece(gomock.Any(), gomock.Any()).Return(nil).Times(2)
m3.EXPECT().DeletePiecesByPrefix(gomock.Any(), gomock.Any()).Return(uint64(0), nil).Times(2)
e.baseApp.SetPieceStore(m3)

m4 := corespdb.NewMockSPDB(ctrl)
Expand Down
25 changes: 25 additions & 0 deletions store/piecestore/client/piece_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,28 @@ func (client *StoreClient) DeletePiece(ctx context.Context, key string) error {
err = client.ps.Delete(ctx, key)
return err
}

// DeletePiecesByPrefix deletes pieces by prefix from piece store.
func (client *StoreClient) DeletePiecesByPrefix(ctx context.Context, key string) (uint64, error) {
var (
startTime = time.Now()
err error
valSize uint64
)
defer func() {
if err != nil {
metrics.PieceStoreCounter.WithLabelValues(PieceStoreFailureDel).Inc()
metrics.PieceStoreTime.WithLabelValues(PieceStoreFailureDel).Observe(
time.Since(startTime).Seconds())
return
}
metrics.PieceStoreCounter.WithLabelValues(PieceStoreSuccessDel).Inc()
metrics.PieceStoreTime.WithLabelValues(PieceStoreSuccessDel).Observe(
time.Since(startTime).Seconds())
metrics.PieceStoreUsageAmountGauge.WithLabelValues(PieceStoreSuccessDel).Add(0 - float64(valSize))
}()

valSize, err = client.ps.DeleteByPrefix(ctx, key)

return valSize, err
}
6 changes: 6 additions & 0 deletions store/piecestore/piece/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type PieceAPI interface {
Get(ctx context.Context, key string, offset, limit int64) (io.ReadCloser, error)
Put(ctx context.Context, key string, reader io.Reader) error
Delete(ctx context.Context, key string) error
DeleteByPrefix(ctx context.Context, key string) (uint64, error)
}

type PieceStore struct {
Expand All @@ -35,6 +36,11 @@ func (p *PieceStore) Delete(ctx context.Context, key string) error {
return p.storeAPI.DeleteObject(ctx, key)
}

// DeleteByPrefix deletes several pieces in PieceStore and returns deleted size
func (p *PieceStore) DeleteByPrefix(ctx context.Context, key string) (uint64, error) {
return p.storeAPI.DeleteObjectsByPrefix(ctx, key)
}

// Head returns piece info in PieceStore
func (p *PieceStore) Head(ctx context.Context, key string) (storage.Object, error) {
return p.storeAPI.HeadObject(ctx, key)
Expand Down
15 changes: 15 additions & 0 deletions store/piecestore/piece/api_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions store/piecestore/storage/disk_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"io/fs"
"math/rand"
"os"
"path/filepath"
Expand Down Expand Up @@ -142,6 +143,44 @@ func (d *diskFileStore) DeleteObject(ctx context.Context, key string) error {
return err
}

func (d *diskFileStore) DeleteObjectsByPrefix(ctx context.Context, key string) (uint64, error) {
dirEntries, err := os.ReadDir(d.root)
if err != nil {
log.Errorw("DeleteObjectsByPrefix read directory error", "error", err)
return 0, err
}

var (
size uint64
entryInfo fs.FileInfo
)

for _, dirEntry := range dirEntries {
entryName := dirEntry.Name()
if strings.HasPrefix(entryName, key) {
var curInfoSize int64
// need to extract entry info and size first, otherwise when the object is deleted, the info can not be found
entryInfo, err = dirEntry.Info()
if entryInfo != nil {
curInfoSize = entryInfo.Size()
}
if err != nil {
log.Errorw("get dirEntry info error", "error", err)
}
err = d.DeleteObject(ctx, entryName)
if err != nil {
log.Errorw("remove single file by prefix error", "error", err)
} else {
if entryInfo != nil {
size += uint64(curInfoSize)
}
}
}
}

return size, nil
}

func (d *diskFileStore) HeadBucket(ctx context.Context) error {
if _, err := os.Stat(d.root); err != nil {
if os.IsNotExist(err) {
Expand Down
2 changes: 2 additions & 0 deletions store/piecestore/storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type ObjectStorage interface {
PutObject(ctx context.Context, key string, reader io.Reader) error
// DeleteObject deletes an object
DeleteObject(ctx context.Context, key string) error
// DeleteObjectsByPrefix deletes objects by prefix
DeleteObjectsByPrefix(ctx context.Context, key string) (uint64, error)

// HeadBucket determines if a bucket exists and have permission to access it
HeadBucket(ctx context.Context) error
Expand Down
Loading

0 comments on commit 042fcfb

Please sign in to comment.