Skip to content

Commit

Permalink
delete blob on put (#4418)
Browse files Browse the repository at this point in the history
Co-authored-by: envestcc <[email protected]>
  • Loading branch information
CoderZhi and envestcc authored Oct 27, 2024
1 parent d32bf7d commit 1938d2a
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 102 deletions.
123 changes: 63 additions & 60 deletions blockchain/blockdao/blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package blockdao
import (
"context"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -22,7 +21,6 @@ import (
"github.com/iotexproject/iotex-core/v2/blockchain/block"
"github.com/iotexproject/iotex-core/v2/db"
"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/routine"
)

var (
Expand All @@ -34,6 +32,7 @@ var (
)

type (
// BlobStore defines the interface of blob store
BlobStore interface {
Start(context.Context) error
Stop(context.Context) error
Expand Down Expand Up @@ -62,42 +61,27 @@ type (
// entire blob storage is 786kB x 311040 = 245GB.
//
blobStore struct {
kvStore db.KVStore
totalBlocks uint64
currWriteBlock, currExpireBlock uint64
purgeTask *routine.RecurringTask
interval time.Duration
kvStore db.KVStore
totalBlocks uint64
currWriteBlock uint64
}
)

func NewBlobStore(kv db.KVStore, size uint64, interval time.Duration) *blobStore {
func NewBlobStore(kv db.KVStore, size uint64) *blobStore {
return &blobStore{
kvStore: kv,
totalBlocks: size,
interval: interval,
}
}

func (bs *blobStore) Start(ctx context.Context) error {
if err := bs.kvStore.Start(ctx); err != nil {
return err
}
if err := bs.checkDB(); err != nil {
return err
}
if bs.interval != 0 {
bs.purgeTask = routine.NewRecurringTask(func() {
bs.expireBlob(atomic.LoadUint64(&bs.currWriteBlock))
}, bs.interval)
return bs.purgeTask.Start(ctx)
}
return nil
return bs.checkDB()
}

func (bs *blobStore) Stop(ctx context.Context) error {
if err := bs.purgeTask.Stop(ctx); err != nil {
return err
}
return bs.kvStore.Stop(ctx)
}

Expand All @@ -107,10 +91,6 @@ func (bs *blobStore) checkDB() error {
if err != nil && errors.Cause(err) != db.ErrNotExist {
return err
}
bs.currExpireBlock, err = bs.getHeightByHash(_expireHeight)
if err != nil && errors.Cause(err) != db.ErrNotExist {
return err
}
// in case the retention window size has shrunk, do a one-time purge
return bs.expireBlob(bs.currWriteBlock)
}
Expand Down Expand Up @@ -150,8 +130,9 @@ func (bs *blobStore) getBlobs(height uint64) ([]*types.BlobTxSidecar, []string,
}

func (bs *blobStore) PutBlock(blk *block.Block) error {
if blk.Height() <= atomic.LoadUint64(&bs.currWriteBlock) {
return errors.Errorf("block height %d is less than current tip height", blk.Height())
height := blk.Height()
if height <= atomic.LoadUint64(&bs.currWriteBlock) {
return errors.Errorf("block height %d is less than current tip height", height)
}
pb := iotextypes.BlobTxSidecars{
TxHash: make([][]byte, 0),
Expand All @@ -167,17 +148,44 @@ func (bs *blobStore) PutBlock(blk *block.Block) error {
pb.Sidecars = append(pb.Sidecars, action.ToProtoSideCar(b))
}
}
raw, err := proto.Marshal(&pb)
if err != nil {
return errors.Wrapf(err, "failed to put block = %d", blk.Height())
var (
raw []byte
err error
)
if len(pb.Sidecars) != 0 {
raw, err = proto.Marshal(&pb)
if err != nil {
return errors.Wrapf(err, "failed to put block = %d", height)
}
}
b := batch.NewBatch()
if raw != nil {
bs.putBlob(raw, height, pb.TxHash, b)
}
if height >= bs.totalBlocks {
k := keyForBlock(height - bs.totalBlocks)
v, err := bs.kvStore.Get(_heightIndexNS, k)
if err != nil {
if errors.Cause(err) != db.ErrNotExist {
return err
}
} else {
if err := bs.deleteBlob(k, v, b); err != nil {
return errors.Wrapf(err, "failed to delete blob")
}
}
}
if err := bs.kvStore.WriteBatch(b); err != nil {
return errors.Wrapf(err, "failed to write batch")
}
return bs.putBlob(raw, blk.Height(), pb.TxHash)
atomic.StoreUint64(&bs.currWriteBlock, height)

return nil
}

func (bs *blobStore) putBlob(blob []byte, height uint64, txHash [][]byte) error {
func (bs *blobStore) putBlob(blob []byte, height uint64, txHash [][]byte, b batch.KVStoreBatch) {
// write blob index
var (
b = batch.NewBatch()
key = keyForBlock(height)
index = blobIndex{hashes: txHash}
)
Expand All @@ -189,10 +197,20 @@ func (bs *blobStore) putBlob(blob []byte, height uint64, txHash [][]byte) error
// write the blob data and height
b.Put(_blobDataNS, key, blob, "failed to put blob")
b.Put(_hashHeightNS, _writeHeight, key, "failed to put write height")
if err := bs.kvStore.WriteBatch(b); err != nil {
}

func (bs *blobStore) deleteBlob(k []byte, v []byte, b batch.KVStoreBatch) error {
dx, err := deserializeBlobIndex(v)
if err != nil {
return err
}
atomic.StoreUint64(&bs.currWriteBlock, height)
// delete tx hash of expired blobs
for i := range dx.hashes {
b.Delete(_hashHeightNS, dx.hashes[i], "failed to delete hash")
}
b.Delete(_heightIndexNS, k, "failed to delete index")
b.Delete(_blobDataNS, k, "failed to delete blob")

return nil
}

Expand All @@ -201,41 +219,26 @@ func (bs *blobStore) expireBlob(height uint64) error {
b = batch.NewBatch()
toExpireHeight = height - bs.totalBlocks
)
if height <= bs.totalBlocks || toExpireHeight <= bs.currExpireBlock {
if height <= bs.totalBlocks {
return nil
}
ek, ev, err := bs.kvStore.Filter(_heightIndexNS, func(k, v []byte) bool { return true },
keyForBlock(bs.currExpireBlock), keyForBlock(toExpireHeight))
ek, ev, err := bs.kvStore.Filter(_heightIndexNS, func(k, v []byte) bool {
return byteutil.BytesToUint64BigEndian(k) <= toExpireHeight
}, nil, nil)
if errors.Cause(err) == db.ErrNotExist {
// no blobs are stored up to the new expire height
bs.currExpireBlock = toExpireHeight
return nil
}
if err != nil {
return err
}
for _, v := range ev {
dx, err := deserializeBlobIndex(v)
if err != nil {
return err
}
// delete tx hash of expired blobs
for i := range dx.hashes {
b.Delete(_hashHeightNS, dx.hashes[i], "failed to delete hash")
for i, key := range ek {
v := ev[i]
if err := bs.deleteBlob(key, v, b); err != nil {
return errors.Wrapf(err, "failed to delete blob")
}
}
for i := range ek {
// delete index expired blob data
b.Delete(_heightIndexNS, ek[i], "failed to delete index")
b.Delete(_blobDataNS, ek[i], "failed to delete blob")
}
// update expired blob height
b.Put(_hashHeightNS, _expireHeight, keyForBlock(toExpireHeight), "failed to put expired height")
if err := bs.kvStore.WriteBatch(b); err != nil {
return err
}
bs.currExpireBlock = toExpireHeight
return nil
return bs.kvStore.WriteBatch(b)
}

func decodeBlob(raw []byte) ([]*types.BlobTxSidecar, []string, error) {
Expand Down
41 changes: 6 additions & 35 deletions blockchain/blockdao/blob_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
"context"
"encoding/hex"
"testing"
"time"

"github.com/iotexproject/go-pkgs/crypto"
"github.com/iotexproject/go-pkgs/hash"
"github.com/stretchr/testify/require"

"github.com/iotexproject/iotex-core/v2/blockchain/block"
"github.com/iotexproject/iotex-core/v2/db"
"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/compress"
. "github.com/iotexproject/iotex-core/v2/pkg/util/assertions"
"github.com/iotexproject/iotex-core/v2/testutil"
Expand Down Expand Up @@ -47,7 +47,7 @@ func TestBlobStore(t *testing.T) {
cfg := db.DefaultConfig
cfg.DbPath = testPath
kvs := db.NewBoltDB(cfg)
bs := NewBlobStore(kvs, 24, time.Second)
bs := NewBlobStore(kvs, 24)
r.NoError(bs.Start(ctx))
var (
_value [7][]byte = [7][]byte{
Expand All @@ -61,9 +61,10 @@ func TestBlobStore(t *testing.T) {
}
)
for i, height := range []uint64{0, 3, 5, 10, 13, 18, 29, 37} {
b := batch.NewBatch()
hashes := createTestHash(i, height)
r.NoError(bs.putBlob(_value[i%7], height, hashes))
r.Equal(height, bs.currWriteBlock)
bs.putBlob(_value[i%7], height, hashes, b)
r.NoError(bs.kvStore.WriteBatch(b))
raw, err := bs.kvStore.Get(_heightIndexNS, keyForBlock(height))
r.NoError(err)
index, err := deserializeBlobIndex(raw)
Expand All @@ -78,40 +79,10 @@ func TestBlobStore(t *testing.T) {
r.NoError(err)
r.Equal(_value[i%7], v)
}
time.Sleep(time.Second * 3 / 2)
// slot 0 - 13 has expired
for i, height := range []uint64{0, 3, 5, 10, 13, 18, 29, 37} {
hashes := createTestHash(i, height)
raw, err := bs.kvStore.Get(_heightIndexNS, keyForBlock(height))
if i <= 4 {
r.ErrorIs(err, db.ErrNotExist)
} else {
index, err := deserializeBlobIndex(raw)
r.NoError(err)
r.Equal(index.hashes, hashes)
}
for j := range hashes {
h, err := bs.getHeightByHash(hashes[j])
if i <= 4 {
r.ErrorIs(err, db.ErrNotExist)
} else {
r.NoError(err)
r.Equal(height, h)
}
}
v, err := bs.kvStore.Get(_blobDataNS, keyForBlock(height))
if i <= 4 {
r.ErrorIs(err, db.ErrNotExist)
} else {
r.NoError(err)
r.Equal(_value[i%7], v)
}
}
// verify write and expire block
r.NoError(bs.Stop(ctx))
r.NoError(bs.Start(ctx))
r.EqualValues(37, bs.currWriteBlock)
r.EqualValues(13, bs.currExpireBlock)
r.NoError(bs.Stop(ctx))
})
t.Run("PutBlock", func(t *testing.T) {
Expand All @@ -124,7 +95,7 @@ func TestBlobStore(t *testing.T) {
cfg := db.DefaultConfig
cfg.DbPath = testPath
kvs := db.NewBoltDB(cfg)
bs := NewBlobStore(kvs, 24, time.Second)
bs := NewBlobStore(kvs, 24)
testPath1, err := testutil.PathOfTempFile("test-blob-store")
r.NoError(err)
cfg.DbPath = testPath1
Expand Down
2 changes: 1 addition & 1 deletion blockchain/blockdao/blockdao.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (dao *blockDAO) PutBlock(ctx context.Context, blk *block.Block) error {
timer.End()
return err
}
if dao.blobStore != nil && blk.HasBlob() {
if dao.blobStore != nil {
if err := dao.blobStore.PutBlock(blk); err != nil {
timer.End()
return err
Expand Down
6 changes: 4 additions & 2 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,10 @@ func (builder *Builder) buildBlockDAO(forTest bool) error {
if bsPath := cfg.Chain.BlobStoreDBPath; len(bsPath) > 0 {
blocksPerHour := time.Hour / cfg.DardanellesUpgrade.BlockInterval
dbConfig.DbPath = bsPath
blobStore = blockdao.NewBlobStore(db.NewBoltDB(dbConfig),
uint64(blocksPerHour)*uint64(cfg.Chain.BlobStoreRetentionDays)*24, cfg.Chain.BlobPurgeInterval)
blobStore = blockdao.NewBlobStore(
db.NewBoltDB(dbConfig),
uint64(blocksPerHour)*uint64(cfg.Chain.BlobStoreRetentionDays)*24,
)
opts = append(opts, blockdao.WithBlobStore(blobStore))
}
}
Expand Down
12 changes: 12 additions & 0 deletions e2etest/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
_dBPath2 = "db.test2"
_triePath = "trie.test"
_triePath2 = "trie.test2"
_blobPath = "blob.test"
_disabledIP = "169.254."
)

Expand Down Expand Up @@ -363,6 +364,8 @@ func TestLocalSync(t *testing.T) {
require.NoError(err)
indexDBPath2, err := testutil.PathOfTempFile(_dBPath2)
require.NoError(err)
blobIndexPath, err := testutil.PathOfTempFile(_blobPath)
require.NoError(err)
contractIndexDBPath2, err := testutil.PathOfTempFile(_dBPath2)
require.NoError(err)

Expand All @@ -374,11 +377,13 @@ func TestLocalSync(t *testing.T) {
cfg.Chain.TrieDBPath = testTriePath2
cfg.Chain.ChainDBPath = testDBPath2
cfg.Chain.IndexDBPath = indexDBPath2
cfg.Chain.BlobStoreDBPath = blobIndexPath
cfg.Chain.ContractStakingIndexDBPath = contractIndexDBPath2
defer func() {
testutil.CleanupPath(testTriePath2)
testutil.CleanupPath(testDBPath2)
testutil.CleanupPath(indexDBPath2)
testutil.CleanupPath(blobIndexPath)
testutil.CleanupPath(contractIndexDBPath2)
}()

Expand Down Expand Up @@ -430,13 +435,16 @@ func TestStartExistingBlockchain(t *testing.T) {
require.NoError(err)
testContractStakeIndexPath, err := testutil.PathOfTempFile(_dBPath)
require.NoError(err)
testBlobIndexPath, err := testutil.PathOfTempFile(_blobPath)
require.NoError(err)
// Disable block reward to make bookkeeping easier
cfg := config.Default
cfg.Chain.TrieDBPatchFile = ""
cfg.Chain.BlobStoreDBPath = ""
cfg.Chain.TrieDBPath = testTriePath
cfg.Chain.ChainDBPath = testDBPath
cfg.Chain.IndexDBPath = testIndexPath
cfg.Chain.BlobStoreDBPath = testBlobIndexPath
cfg.Chain.ContractStakingIndexDBPath = testContractStakeIndexPath
cfg.Chain.EnableAsyncIndexWrite = false
cfg.ActPool.MinGasPriceStr = "0"
Expand All @@ -458,6 +466,7 @@ func TestStartExistingBlockchain(t *testing.T) {
testutil.CleanupPath(testTriePath)
testutil.CleanupPath(testDBPath)
testutil.CleanupPath(testIndexPath)
testutil.CleanupPath(testBlobIndexPath)
testutil.CleanupPath(testContractStakeIndexPath)
}()

Expand Down Expand Up @@ -540,6 +549,9 @@ func TestStartExistingBlockchain(t *testing.T) {

func newTestConfig() (config.Config, error) {
cfg := config.Default
cfg.Chain.TrieDBPath = _triePath
cfg.Chain.ChainDBPath = _dBPath
cfg.Chain.BlobStoreDBPath = _blobPath
cfg.ActPool.MinGasPriceStr = "0"
cfg.Consensus.Scheme = config.NOOPScheme
cfg.Network.Port = testutil.RandomPort()
Expand Down
Loading

0 comments on commit 1938d2a

Please sign in to comment.