Skip to content

Commit

Permalink
[factory] implement versioned stateDB to support archive mode
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Dec 11, 2024
1 parent 6ac9264 commit 3168caa
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 40 deletions.
5 changes: 5 additions & 0 deletions blockchain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type (
EnableStateDBCaching bool `yaml:"enableStateDBCaching"`
// EnableArchiveMode is only meaningful when EnableTrielessStateDB is false
EnableArchiveMode bool `yaml:"enableArchiveMode"`
// VersionedNamespaces specifies the versioned namespaces for versioned state DB
VersionedNamespaces []string `yaml:"versionedNamespaces"`
// VersionedMetadata specifies the metadata namespace for versioned state DB
VersionedMetadata string `yaml:"versionedMetadata"`
// EnableAsyncIndexWrite enables writing the block actions' and receipts' index asynchronously
EnableAsyncIndexWrite bool `yaml:"enableAsyncIndexWrite"`
// deprecated
Expand Down Expand Up @@ -107,6 +111,7 @@ var (
GravityChainAPIs: []string{},
},
EnableTrielessStateDB: true,
VersionedNamespaces: []string{},
EnableStateDBCaching: false,
EnableArchiveMode: false,
EnableAsyncIndexWrite: true,
Expand Down
5 changes: 4 additions & 1 deletion chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ func (builder *Builder) createFactory(forTest bool) (factory.Factory, error) {
factory.RegistryStateDBOption(builder.cs.registry),
factory.DefaultPatchOption(),
}
if builder.cfg.Chain.EnableStateDBCaching {
if builder.cfg.Chain.EnableArchiveMode {
dao, err = db.CreateKVStoreVersioned(factoryDBCfg, builder.cfg.Chain.TrieDBPath, builder.cfg.Chain.VersionedNamespaces)
opts = append(opts, factory.MetadataNamespaceOption(builder.cfg.Chain.VersionedMetadata))
} else if builder.cfg.Chain.EnableStateDBCaching {
dao, err = db.CreateKVStoreWithCache(factoryDBCfg, builder.cfg.Chain.TrieDBPath, builder.cfg.Chain.StateDBCacheSize)
} else {
dao, err = db.CreateKVStore(factoryDBCfg, builder.cfg.Chain.TrieDBPath)
Expand Down
9 changes: 9 additions & 0 deletions db/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,12 @@ func CreateKVStoreWithCache(cfg Config, dbPath string, cacheSize int) (KVStore,

return NewKvStoreWithCache(dao, cacheSize), nil
}

// CreateKVStoreVersioned creates versioned db from config and db path
func CreateKVStoreVersioned(cfg Config, dbPath string, vns []string) (KVStore, error) {
if len(dbPath) == 0 {
return nil, ErrEmptyDBPath
}
cfg.DbPath = dbPath
return NewKVStoreWithVersion(cfg, VersionedNamespaceOption(vns...)), nil
}
98 changes: 71 additions & 27 deletions state/factory/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ type stateDB struct {
cfg Config
registry *protocol.Registry
dao db.KVStore // the underlying DB for account/contract storage
daoVersioned db.KvVersioned
timerFactory *prometheustimer.TimerFactory
workingsets cache.LRUCache // lru cache for workingsets
protocolView protocol.View
skipBlockValidationOnPut bool
versioned bool
metaNS string // metadata namespace for versioned DB
ps *patchStore
}

Expand Down Expand Up @@ -82,22 +85,39 @@ func DisableWorkingSetCacheOption() StateDBOption {
}
}

// MetadataNamespaceOption specifies the metadat namespace for versioned DB
func MetadataNamespaceOption(ns string) StateDBOption {
return func(sdb *stateDB, cfg *Config) error {
sdb.metaNS = ns
return nil
}
}

// NewStateDB creates a new state db
func NewStateDB(cfg Config, dao db.KVStore, opts ...StateDBOption) (Factory, error) {
sdb := stateDB{
cfg: cfg,
currentChainHeight: 0,
versioned: cfg.Chain.EnableArchiveMode,
registry: protocol.NewRegistry(),
protocolView: protocol.View{},
workingsets: cache.NewThreadSafeLruCache(int(cfg.Chain.WorkingSetCacheSize)),
dao: dao,
}
for _, opt := range opts {
if err := opt(&sdb, &cfg); err != nil {
log.S().Errorf("Failed to execute state factory creation option %p: %v", opt, err)
return nil, err
}
}
if sdb.versioned {
daoVersioned, ok := dao.(db.KvVersioned)
if !ok {
return nil, errors.Wrap(ErrNotSupported, "cannot enable archive mode StateDB with non-versioned DB")
}
sdb.daoVersioned = daoVersioned
} else {
sdb.dao = dao
}
timerFactory, err := prometheustimer.New(
"iotex_statefactory_perf",
"Performance of state factory module",
Expand All @@ -111,23 +131,30 @@ func NewStateDB(cfg Config, dao db.KVStore, opts ...StateDBOption) (Factory, err
return &sdb, nil
}

func (sdb *stateDB) DAO(height uint64) db.KVStore {
if sdb.versioned {
return sdb.daoVersioned.SetVersion(height)
}
return sdb.dao
}

func (sdb *stateDB) Start(ctx context.Context) error {
ctx = protocol.WithRegistry(ctx, sdb.registry)
if err := sdb.dao.Start(ctx); err != nil {
if err := sdb.DAO(0).Start(ctx); err != nil {
return err
}
// check factory height
h, err := sdb.dao.Get(AccountKVNamespace, []byte(CurrentHeightKey))
h, err := sdb.getHeight()
switch errors.Cause(err) {
case nil:
sdb.currentChainHeight = byteutil.BytesToUint64(h)
sdb.currentChainHeight = h
// start all protocols
if sdb.protocolView, err = sdb.registry.StartAll(ctx, sdb); err != nil {
return err
}
case db.ErrNotExist:
sdb.currentChainHeight = 0
if err = sdb.dao.Put(AccountKVNamespace, []byte(CurrentHeightKey), byteutil.Uint64ToBytes(0)); err != nil {
if err = sdb.putHeight(0); err != nil {
return errors.Wrap(err, "failed to init statedb's height")
}
// start all protocols
Expand Down Expand Up @@ -158,24 +185,46 @@ func (sdb *stateDB) Stop(ctx context.Context) error {
sdb.mutex.Lock()
defer sdb.mutex.Unlock()
sdb.workingsets.Clear()
return sdb.dao.Stop(ctx)
return sdb.DAO(0).Stop(ctx)
}

// Height returns factory's height
func (sdb *stateDB) Height() (uint64, error) {
sdb.mutex.RLock()
defer sdb.mutex.RUnlock()
height, err := sdb.dao.Get(AccountKVNamespace, []byte(CurrentHeightKey))
return sdb.getHeight()
}

func (sdb *stateDB) getHeight() (uint64, error) {
height, err := sdb.DAO(0).Get(sdb.metadataNS(), []byte(CurrentHeightKey))
if err != nil {
return 0, errors.Wrap(err, "failed to get factory's height from underlying DB")
}
return byteutil.BytesToUint64(height), nil
}

func (sdb *stateDB) putHeight(h uint64) error {
return sdb.DAO(h).Put(sdb.metadataNS(), []byte(CurrentHeightKey), byteutil.Uint64ToBytes(h))
}

func (sdb *stateDB) metadataNS() string {
if sdb.versioned {
return sdb.metaNS
}
return AccountKVNamespace
}

func (sdb *stateDB) transCurrentHeight(wi *batch.WriteInfo) *batch.WriteInfo {
if wi.Namespace() == sdb.metaNS && string(wi.Key()) == CurrentHeightKey && wi.WriteType() == batch.Put {
return batch.NewWriteInfo(wi.WriteType(), AccountKVNamespace, wi.Key(), wi.Value(), wi.Error())
}
return wi
}

func (sdb *stateDB) newWorkingSet(ctx context.Context, height uint64) (*workingSet, error) {
g := genesis.MustExtractGenesisContext(ctx)
flusher, err := db.NewKVStoreFlusher(
sdb.dao,
sdb.DAO(height),
batch.NewCachedBatch(),
sdb.flusherOptions(!g.IsEaster(height))...,
)
Expand All @@ -189,7 +238,7 @@ func (sdb *stateDB) newWorkingSet(ctx context.Context, height uint64) (*workingS
flusher.KVStoreWithBuffer().MustPut(p.Namespace, p.Key, p.Value)
}
}
store := newStateDBWorkingSetStore(sdb.protocolView, flusher, g.IsNewfoundland(height))
store := newStateDBWorkingSetStore(sdb.protocolView, flusher, g.IsNewfoundland(height), sdb.metadataNS())
if err := store.Start(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -267,7 +316,6 @@ func (sdb *stateDB) WorkingSet(ctx context.Context) (protocol.StateManager, erro
}

func (sdb *stateDB) WorkingSetAtHeight(ctx context.Context, height uint64) (protocol.StateManager, error) {
// TODO: implement archive mode
return sdb.newWorkingSet(ctx, height)
}

Expand Down Expand Up @@ -327,12 +375,13 @@ func (sdb *stateDB) State(s interface{}, opts ...protocol.StateOption) (uint64,
if err != nil {
return 0, err
}
sdb.mutex.RLock()
defer sdb.mutex.RUnlock()
if cfg.Keys != nil {
return 0, errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet")
}
return sdb.currentChainHeight, sdb.state(cfg.Namespace, cfg.Key, s)
sdb.mutex.RLock()
height := sdb.currentChainHeight
sdb.mutex.RUnlock()
return height, sdb.state(height, cfg.Namespace, cfg.Key, s)
}

// State returns a set of states in the state factory
Expand All @@ -346,7 +395,7 @@ func (sdb *stateDB) States(opts ...protocol.StateOption) (uint64, state.Iterator
if cfg.Key != nil {
return sdb.currentChainHeight, nil, errors.Wrap(ErrNotSupported, "Read states with key option has not been implemented yet")
}
keys, values, err := readStates(sdb.dao, cfg.Namespace, cfg.Keys)
keys, values, err := readStates(sdb.DAO(sdb.currentChainHeight), cfg.Namespace, cfg.Keys)
if err != nil {
return 0, nil, err
}
Expand All @@ -358,28 +407,23 @@ func (sdb *stateDB) States(opts ...protocol.StateOption) (uint64, state.Iterator
return sdb.currentChainHeight, iter, nil
}

// StateAtHeight returns a confirmed state at height -- archive mode
func (sdb *stateDB) StateAtHeight(height uint64, s interface{}, opts ...protocol.StateOption) error {
return ErrNotSupported
}

// StatesAtHeight returns a set states in the state factory at height -- archive mode
func (sdb *stateDB) StatesAtHeight(height uint64, opts ...protocol.StateOption) (state.Iterator, error) {
return nil, errors.Wrap(ErrNotSupported, "state db does not support archive mode")
}

// ReadView reads the view
func (sdb *stateDB) ReadView(name string) (interface{}, error) {
return sdb.protocolView.Read(name)
}

//======================================
// private trie constructor functions
// private statedb functions
//======================================

func (sdb *stateDB) flusherOptions(preEaster bool) []db.KVStoreFlusherOption {
opts := []db.KVStoreFlusherOption{
db.SerializeOption(func(wi *batch.WriteInfo) []byte {
if sdb.versioned {
// current height is moved to another namespace
// transform it back for the purpose of calculating digest
wi = sdb.transCurrentHeight(wi)
}
if preEaster {
return wi.SerializeWithoutWriteType()
}
Expand All @@ -397,8 +441,8 @@ func (sdb *stateDB) flusherOptions(preEaster bool) []db.KVStoreFlusherOption {
)
}

func (sdb *stateDB) state(ns string, addr []byte, s interface{}) error {
data, err := sdb.dao.Get(ns, addr)
func (sdb *stateDB) state(h uint64, ns string, addr []byte, s interface{}) error {
data, err := sdb.DAO(h).Get(ns, addr)
if err != nil {
if errors.Cause(err) == db.ErrNotExist {
return errors.Wrapf(state.ErrStateNotExist, "state of %x doesn't exist", addr)
Expand Down
6 changes: 4 additions & 2 deletions state/factory/workingsetstore_statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ import (
type stateDBWorkingSetStore struct {
*workingSetStoreCommon
readBuffer bool
metaNS string // metadata namespace for versioned DB
}

func newStateDBWorkingSetStore(view protocol.View, flusher db.KVStoreFlusher, readBuffer bool) workingSetStore {
func newStateDBWorkingSetStore(view protocol.View, flusher db.KVStoreFlusher, readBuffer bool, ns string) workingSetStore {
return &stateDBWorkingSetStore{
workingSetStoreCommon: &workingSetStoreCommon{
flusher: flusher,
view: view,
},
readBuffer: readBuffer,
metaNS: ns,
}
}

Expand Down Expand Up @@ -61,7 +63,7 @@ func (store *stateDBWorkingSetStore) States(ns string, keys [][]byte) ([][]byte,
func (store *stateDBWorkingSetStore) Finalize(height uint64) error {
// Persist current chain Height
store.flusher.KVStoreWithBuffer().MustPut(
AccountKVNamespace,
store.metaNS,
[]byte(CurrentHeightKey),
byteutil.Uint64ToBytes(height),
)
Expand Down
61 changes: 51 additions & 10 deletions state/factory/workingsetstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,35 @@ import (
"testing"

"github.com/iotexproject/iotex-core/v2/action/protocol"
"github.com/iotexproject/iotex-core/v2/action/protocol/execution/evm"
"github.com/iotexproject/iotex-core/v2/db"
"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
"github.com/stretchr/testify/require"
)

var (
name = "name"
viewValue = "value"
namespace = "namespace"
key1 = []byte("key1")
value1 = []byte("value1")
key2 = []byte("key2")
value2 = []byte("value2")
key3 = []byte("key3")
value3 = []byte("value3")
)

func TestStateDBWorkingSetStore(t *testing.T) {
require := require.New(t)
ctx := context.Background()
view := protocol.View{}
inMemStore := db.NewMemKVStore()
flusher, err := db.NewKVStoreFlusher(inMemStore, batch.NewCachedBatch())
require.NoError(err)
store := newStateDBWorkingSetStore(view, flusher, true)
store := newStateDBWorkingSetStore(view, flusher, true, AccountKVNamespace)
require.NotNil(store)
require.NoError(store.Start(ctx))
name := "name"
viewValue := "value"
namespace := "namespace"
key1 := []byte("key1")
value1 := []byte("value1")
key2 := []byte("key2")
value2 := []byte("value2")
key3 := []byte("key3")
value3 := []byte("value3")
t.Run("test view", func(t *testing.T) {
_, err := store.ReadView(name)
require.Error(err)
Expand Down Expand Up @@ -109,6 +113,43 @@ func TestStateDBWorkingSetStore(t *testing.T) {
require.NoError(store.Stop(ctx))
}

func TestVersionedWorkingSetStore(t *testing.T) {
r := require.New(t)
var (
mns = "mta"
stores = []workingSetStore{}
digest = [2]string{
"e6958faedcc37528dad9ac99f5e6613fbefbf403a06fe962535225d42a27b189",
"bb262ac0603e48aa737f5eb42014f481cb54d831c14fe736b8f61b69e5b4924a",
}
)
for _, preEaster := range []bool{false, true} {
for _, versioned := range []bool{true, false} {
for _, ns := range []string{mns, "test1", "test can pass with any string here"} {
sdb := stateDB{
versioned: versioned,
metaNS: ns,
}
flusher, err := db.NewKVStoreFlusher(db.NewMemKVStore(), batch.NewCachedBatch(), sdb.flusherOptions(preEaster)...)
r.NoError(err)
stores = append(stores, newStateDBWorkingSetStore(nil, flusher, true, sdb.metadataNS()))
}
}
}
for i, store := range stores {
r.NotNil(store)
r.NoError(store.Put(namespace, key1, value1))
r.NoError(store.Put(namespace, key2, value2))
r.NoError(store.Put(namespace, []byte(CurrentHeightKey), value3))
r.NoError(store.Put(mns, key1, value1))
r.NoError(store.Delete(namespace, key2))
r.NoError(store.Put(evm.CodeKVNameSpace, key3, value1))
r.NoError(store.Finalize(3))
h := store.Digest()
r.Equal(digest[i/6], hex.EncodeToString(h[:]))
}
}

func TestFactoryWorkingSetStore(t *testing.T) {
// TODO: add unit test for factory working set store
}

0 comments on commit 3168caa

Please sign in to comment.