From 1c93d4f3f46b0f93991db85f4c4b286e2135ef22 Mon Sep 17 00:00:00 2001 From: Dustin Xie Date: Fri, 26 Apr 2024 18:02:35 -0700 Subject: [PATCH] [factory] implement versioned stateDB to support archive mode --- blockchain/config.go | 5 ++ chainservice/builder.go | 5 +- config/config.go | 17 +++-- config/config_test.go | 48 +++++++++---- db/builder.go | 9 +++ db/kvstore_versioned.go | 25 +++++++ state/factory/daoretrofitter.go | 27 ++++++++ state/factory/daoretrofitter_archive.go | 88 ++++++++++++++++++++++++ state/factory/statedb.go | 51 +++++++------- state/factory/workingsetstore_statedb.go | 6 +- state/factory/workingsetstore_test.go | 63 ++++++++++++++--- 11 files changed, 286 insertions(+), 58 deletions(-) create mode 100644 state/factory/daoretrofitter_archive.go diff --git a/blockchain/config.go b/blockchain/config.go index 8993e92adc..a0731c796d 100644 --- a/blockchain/config.go +++ b/blockchain/config.go @@ -52,6 +52,10 @@ type ( EnableStateDBCaching bool `yaml:"enableStateDBCaching"` // EnableArchiveMode is only meaningful when EnableTrielessStateDB is false EnableArchiveMode bool `yaml:"enableArchiveMode"` + // VersionedNamespaces specifies the versioned namespaces for versioned state DB + VersionedNamespaces []string `yaml:"versionedNamespaces"` + // VersionedMetadata specifies the metadata namespace for versioned state DB + VersionedMetadata string `yaml:"versionedMetadata"` // EnableAsyncIndexWrite enables writing the block actions' and receipts' index asynchronously EnableAsyncIndexWrite bool `yaml:"enableAsyncIndexWrite"` // deprecated @@ -107,6 +111,7 @@ var ( GravityChainAPIs: []string{}, }, EnableTrielessStateDB: true, + VersionedNamespaces: []string{}, EnableStateDBCaching: false, EnableArchiveMode: false, EnableAsyncIndexWrite: true, diff --git a/chainservice/builder.go b/chainservice/builder.go index 69664e37f9..0b4f51448d 100644 --- a/chainservice/builder.go +++ b/chainservice/builder.go @@ -173,7 +173,10 @@ func (builder *Builder) createFactory(forTest bool) (factory.Factory, error) { factory.RegistryStateDBOption(builder.cs.registry), factory.DefaultPatchOption(), } - if builder.cfg.Chain.EnableStateDBCaching { + if builder.cfg.Chain.EnableArchiveMode { + dao, err = db.CreateKVStoreVersioned(factoryDBCfg, builder.cfg.Chain.TrieDBPath, builder.cfg.Chain.VersionedNamespaces) + opts = append(opts, factory.MetadataNamespaceOption(builder.cfg.Chain.VersionedMetadata)) + } else if builder.cfg.Chain.EnableStateDBCaching { dao, err = db.CreateKVStoreWithCache(factoryDBCfg, builder.cfg.Chain.TrieDBPath, builder.cfg.Chain.StateDBCacheSize) } else { dao, err = db.CreateKVStore(factoryDBCfg, builder.cfg.Chain.TrieDBPath) diff --git a/config/config.go b/config/config.go index 7a50fd5095..7b92cf9f9e 100644 --- a/config/config.go +++ b/config/config.go @@ -256,11 +256,20 @@ func ValidateRollDPoS(cfg Config) error { // ValidateArchiveMode validates the state factory setting func ValidateArchiveMode(cfg Config) error { - if !cfg.Chain.EnableArchiveMode || !cfg.Chain.EnableTrielessStateDB { - return nil + if cfg.Chain.EnableArchiveMode && cfg.Chain.EnableTrielessStateDB { + if len(cfg.Chain.VersionedMetadata) == 0 { + return errors.Wrap(ErrInvalidCfg, "State DB archive mode is enabled with empty metadata namespace") + } + if len(cfg.Chain.VersionedNamespaces) == 0 { + return errors.Wrap(ErrInvalidCfg, "State DB archive mode is enabled with empty versioned namespace") + } + for _, v := range cfg.Chain.VersionedNamespaces { + if len(v) == 0 { + return errors.Wrap(ErrInvalidCfg, "State DB archive mode is enabled with empty versioned namespace") + } + } } - - return errors.Wrap(ErrInvalidCfg, "Archive mode is incompatible with trieless state DB") + return nil } // ValidateAPI validates the api configs diff --git a/config/config_test.go b/config/config_test.go index 485da09d00..6c62b47bf9 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -254,20 +254,42 @@ func TestValidateRollDPoS(t *testing.T) { } func TestValidateArchiveMode(t *testing.T) { + r := require.New(t) cfg := Default - cfg.Chain.EnableArchiveMode = true - cfg.Chain.EnableTrielessStateDB = true - require.Error(t, ErrInvalidCfg, errors.Cause(ValidateArchiveMode(cfg))) - require.EqualError(t, ValidateArchiveMode(cfg), "Archive mode is incompatible with trieless state DB: invalid config value") - cfg.Chain.EnableArchiveMode = false - cfg.Chain.EnableTrielessStateDB = true - require.NoError(t, errors.Cause(ValidateArchiveMode(cfg))) - cfg.Chain.EnableArchiveMode = true - cfg.Chain.EnableTrielessStateDB = false - require.NoError(t, errors.Cause(ValidateArchiveMode(cfg))) - cfg.Chain.EnableArchiveMode = false - cfg.Chain.EnableTrielessStateDB = false - require.NoError(t, errors.Cause(ValidateArchiveMode(cfg))) + cfg.Chain.VersionedMetadata = "meta" + for _, v := range [][2]bool{ + {false, false}, + {false, true}, + {true, false}, + {true, true}, + } { + cfg.Chain.EnableArchiveMode = v[0] + cfg.Chain.EnableTrielessStateDB = v[1] + if !(cfg.Chain.EnableArchiveMode && cfg.Chain.EnableTrielessStateDB) { + r.NoError(ValidateArchiveMode(cfg)) + continue + } + for _, v := range []struct { + ns []string + err string + }{ + {[]string{"", ""}, "State DB archive mode is enabled with empty versioned namespace"}, + {[]string{"Account", ""}, "State DB archive mode is enabled with empty versioned namespace"}, + {[]string{"", "Account"}, "State DB archive mode is enabled with empty versioned namespace"}, + {[]string{"Account", "Contract"}, ""}, + } { + cfg.Chain.VersionedNamespaces = v.ns + if len(v.err) > 0 { + r.ErrorContains(ValidateArchiveMode(cfg), v.err) + } else { + r.NoError(ValidateArchiveMode(cfg)) + } + } + cfg.Chain.VersionedMetadata = "" + r.ErrorContains(ValidateArchiveMode(cfg), "State DB archive mode is enabled with empty metadata namespace") + cfg.Chain.VersionedMetadata = "meta" + r.NoError(ValidateArchiveMode(cfg)) + } } func TestValidateActPool(t *testing.T) { diff --git a/db/builder.go b/db/builder.go index c8c249d983..310d14e647 100644 --- a/db/builder.go +++ b/db/builder.go @@ -32,3 +32,12 @@ func CreateKVStoreWithCache(cfg Config, dbPath string, cacheSize int) (KVStore, return NewKvStoreWithCache(dao, cacheSize), nil } + +// CreateKVStoreVersioned creates versioned db from config and db path +func CreateKVStoreVersioned(cfg Config, dbPath string, vns []string) (KVStore, error) { + if len(dbPath) == 0 { + return nil, ErrEmptyDBPath + } + cfg.DbPath = dbPath + return NewKVStoreWithVersion(cfg, VersionedNamespaceOption(vns...)), nil +} diff --git a/db/kvstore_versioned.go b/db/kvstore_versioned.go index e1e85b01b3..f55b30cea8 100644 --- a/db/kvstore_versioned.go +++ b/db/kvstore_versioned.go @@ -41,10 +41,35 @@ type ( KvVersioned interface { lifecycle.StartStopper + KVStore + + // Base returns the underlying KVStore + Base() KVStore + // Version returns the key's most recent version Version(string, []byte) (uint64, error) // SetVersion sets the version, and returns a KVStore to call Put()/Get() SetVersion(uint64) KVStore } + + // KvWithVersion wraps the versioned DB implementation with a certain version + KvWithVersion struct { + // code in PR4518 + } ) + +type Option func(*KvWithVersion) + +func VersionedNamespaceOption(ns ...string) Option { + return func(k *KvWithVersion) { + // code in PR4518 + } +} + +// NewKVStoreWithVersion implements a KVStore that can handle both versioned +// and non-versioned namespace +func NewKVStoreWithVersion(cfg Config, opts ...Option) KvVersioned { + // code in PR4518 + return nil +} diff --git a/state/factory/daoretrofitter.go b/state/factory/daoretrofitter.go index 7170fcabed..e8b9dfaa81 100644 --- a/state/factory/daoretrofitter.go +++ b/state/factory/daoretrofitter.go @@ -10,7 +10,10 @@ import ( "github.com/pkg/errors" + "github.com/iotexproject/iotex-core/v2/action/protocol/execution/evm" + "github.com/iotexproject/iotex-core/v2/action/protocol/staking" "github.com/iotexproject/iotex-core/v2/db" + "github.com/iotexproject/iotex-core/v2/db/batch" "github.com/iotexproject/iotex-core/v2/pkg/util/byteutil" ) @@ -47,3 +50,27 @@ func (rtf *daoRTF) getHeight() (uint64, error) { func (rtf *daoRTF) putHeight(h uint64) error { return rtf.dao.Put(AccountKVNamespace, []byte(CurrentHeightKey), byteutil.Uint64ToBytes(h)) } + +func (rtf *daoRTF) metadataNS() string { + return AccountKVNamespace +} + +func (rtf *daoRTF) flusherOptions(preEaster bool) []db.KVStoreFlusherOption { + opts := []db.KVStoreFlusherOption{ + db.SerializeOption(func(wi *batch.WriteInfo) []byte { + if preEaster { + return wi.SerializeWithoutWriteType() + } + return wi.Serialize() + }), + } + if !preEaster { + return opts + } + return append( + opts, + db.SerializeFilterOption(func(wi *batch.WriteInfo) bool { + return wi.Namespace() == evm.CodeKVNameSpace || wi.Namespace() == staking.CandsMapNS + }), + ) +} diff --git a/state/factory/daoretrofitter_archive.go b/state/factory/daoretrofitter_archive.go new file mode 100644 index 0000000000..fab2a1f73b --- /dev/null +++ b/state/factory/daoretrofitter_archive.go @@ -0,0 +1,88 @@ +// Copyright (c) 2024 IoTeX Foundation +// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability +// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed. +// This source code is governed by Apache License 2.0 that can be found in the LICENSE file. + +package factory + +import ( + "context" + + "github.com/pkg/errors" + + "github.com/iotexproject/iotex-core/v2/action/protocol/execution/evm" + "github.com/iotexproject/iotex-core/v2/action/protocol/staking" + "github.com/iotexproject/iotex-core/v2/db" + "github.com/iotexproject/iotex-core/v2/db/batch" + "github.com/iotexproject/iotex-core/v2/pkg/util/byteutil" +) + +type daoRTFArchive struct { + daoVersioned db.KvVersioned + metaNS string // namespace for metadata +} + +func newDaoRetrofitterArchive(dao db.KvVersioned, mns string) *daoRTFArchive { + return &daoRTFArchive{ + daoVersioned: dao, + metaNS: mns, + } +} + +func (rtf *daoRTFArchive) Start(ctx context.Context) error { + return rtf.daoVersioned.Start(ctx) +} + +func (rtf *daoRTFArchive) Stop(ctx context.Context) error { + return rtf.daoVersioned.Stop(ctx) +} + +func (rtf *daoRTFArchive) atHeight(h uint64) db.KVStore { + return rtf.daoVersioned.SetVersion(h) +} + +func (rtf *daoRTFArchive) getHeight() (uint64, error) { + height, err := rtf.daoVersioned.Base().Get(rtf.metaNS, []byte(CurrentHeightKey)) + if err != nil { + return 0, errors.Wrap(err, "failed to get factory's height from underlying DB") + } + return byteutil.BytesToUint64(height), nil +} + +func (rtf *daoRTFArchive) putHeight(h uint64) error { + return rtf.daoVersioned.Base().Put(rtf.metaNS, []byte(CurrentHeightKey), byteutil.Uint64ToBytes(h)) +} + +func (rtf *daoRTFArchive) metadataNS() string { + return rtf.metaNS +} + +func (rtf *daoRTFArchive) flusherOptions(preEaster bool) []db.KVStoreFlusherOption { + opts := []db.KVStoreFlusherOption{ + db.SerializeOption(func(wi *batch.WriteInfo) []byte { + // current height is moved to the metadata namespace + // transform it back for the purpose of calculating digest + wi = rtf.transCurrentHeight(wi) + if preEaster { + return wi.SerializeWithoutWriteType() + } + return wi.Serialize() + }), + } + if !preEaster { + return opts + } + return append( + opts, + db.SerializeFilterOption(func(wi *batch.WriteInfo) bool { + return wi.Namespace() == evm.CodeKVNameSpace || wi.Namespace() == staking.CandsMapNS + }), + ) +} + +func (rtf *daoRTFArchive) transCurrentHeight(wi *batch.WriteInfo) *batch.WriteInfo { + if wi.Namespace() == rtf.metaNS && string(wi.Key()) == CurrentHeightKey { + return batch.NewWriteInfo(wi.WriteType(), AccountKVNamespace, wi.Key(), wi.Value(), wi.Error()) + } + return wi +} diff --git a/state/factory/statedb.go b/state/factory/statedb.go index 2e2be991aa..517ecb338f 100644 --- a/state/factory/statedb.go +++ b/state/factory/statedb.go @@ -20,8 +20,6 @@ import ( "github.com/iotexproject/iotex-core/v2/action" "github.com/iotexproject/iotex-core/v2/action/protocol" - "github.com/iotexproject/iotex-core/v2/action/protocol/execution/evm" - "github.com/iotexproject/iotex-core/v2/action/protocol/staking" "github.com/iotexproject/iotex-core/v2/actpool" "github.com/iotexproject/iotex-core/v2/blockchain/block" "github.com/iotexproject/iotex-core/v2/blockchain/genesis" @@ -40,6 +38,8 @@ type ( atHeight(uint64) db.KVStore getHeight() (uint64, error) putHeight(uint64) error + metadataNS() string + flusherOptions(bool) []db.KVStoreFlusherOption } // stateDB implements StateFactory interface, tracks changes to account/contract and batch-commits to DB stateDB struct { @@ -53,6 +53,7 @@ type ( protocolView protocol.View skipBlockValidationOnPut bool ps *patchStore + metaNS string // metadata namespace, only meaningful when archive-mode enabled } ) @@ -91,6 +92,14 @@ func DisableWorkingSetCacheOption() StateDBOption { } } +// MetadataNamespaceOption specifies the metadat namespace for versioned DB +func MetadataNamespaceOption(ns string) StateDBOption { + return func(sdb *stateDB, cfg *Config) error { + sdb.metaNS = ns + return nil + } +} + // NewStateDB creates a new state db func NewStateDB(cfg Config, dao db.KVStore, opts ...StateDBOption) (Factory, error) { sdb := stateDB{ @@ -106,7 +115,15 @@ func NewStateDB(cfg Config, dao db.KVStore, opts ...StateDBOption) (Factory, err return nil, err } } - sdb.dao = newDaoRetrofitter(dao) + if cfg.Chain.EnableArchiveMode { + daoVersioned, ok := dao.(db.KvVersioned) + if !ok { + return nil, errors.Wrap(ErrNotSupported, "cannot enable archive mode StateDB with non-versioned DB") + } + sdb.dao = newDaoRetrofitterArchive(daoVersioned, sdb.metaNS) + } else { + sdb.dao = newDaoRetrofitter(dao) + } timerFactory, err := prometheustimer.New( "iotex_statefactory_perf", "Performance of state factory module", @@ -181,7 +198,7 @@ func (sdb *stateDB) newWorkingSet(ctx context.Context, height uint64) (*workingS flusher, err := db.NewKVStoreFlusher( sdb.dao.atHeight(height), batch.NewCachedBatch(), - sdb.flusherOptions(!g.IsEaster(height))..., + sdb.dao.flusherOptions(!g.IsEaster(height))..., ) if err != nil { return nil, err @@ -193,11 +210,10 @@ func (sdb *stateDB) newWorkingSet(ctx context.Context, height uint64) (*workingS flusher.KVStoreWithBuffer().MustPut(p.Namespace, p.Key, p.Value) } } - store := newStateDBWorkingSetStore(sdb.protocolView, flusher, g.IsNewfoundland(height)) + store := newStateDBWorkingSetStore(sdb.protocolView, flusher, g.IsNewfoundland(height), sdb.dao.metadataNS()) if err := store.Start(ctx); err != nil { return nil, err } - return newWorkingSet(height, store), nil } @@ -271,7 +287,6 @@ func (sdb *stateDB) WorkingSet(ctx context.Context) (protocol.StateManager, erro } func (sdb *stateDB) WorkingSetAtHeight(ctx context.Context, height uint64) (protocol.StateManager, error) { - // TODO: implement archive mode return sdb.newWorkingSet(ctx, height) } @@ -368,29 +383,9 @@ func (sdb *stateDB) ReadView(name string) (interface{}, error) { } //====================================== -// private trie constructor functions +// private statedb functions //====================================== -func (sdb *stateDB) flusherOptions(preEaster bool) []db.KVStoreFlusherOption { - opts := []db.KVStoreFlusherOption{ - db.SerializeOption(func(wi *batch.WriteInfo) []byte { - if preEaster { - return wi.SerializeWithoutWriteType() - } - return wi.Serialize() - }), - } - if !preEaster { - return opts - } - return append( - opts, - db.SerializeFilterOption(func(wi *batch.WriteInfo) bool { - return wi.Namespace() == evm.CodeKVNameSpace || wi.Namespace() == staking.CandsMapNS - }), - ) -} - func (sdb *stateDB) state(h uint64, ns string, addr []byte, s interface{}) error { data, err := sdb.dao.atHeight(h).Get(ns, addr) if err != nil { diff --git a/state/factory/workingsetstore_statedb.go b/state/factory/workingsetstore_statedb.go index 4f96237204..82c93c20bb 100644 --- a/state/factory/workingsetstore_statedb.go +++ b/state/factory/workingsetstore_statedb.go @@ -19,15 +19,17 @@ import ( type stateDBWorkingSetStore struct { *workingSetStoreCommon readBuffer bool + metaNS string // metadata namespace, only meaningful when archive-mode enabled } -func newStateDBWorkingSetStore(view protocol.View, flusher db.KVStoreFlusher, readBuffer bool) workingSetStore { +func newStateDBWorkingSetStore(view protocol.View, flusher db.KVStoreFlusher, readBuffer bool, ns string) workingSetStore { return &stateDBWorkingSetStore{ workingSetStoreCommon: &workingSetStoreCommon{ flusher: flusher, view: view, }, readBuffer: readBuffer, + metaNS: ns, } } @@ -61,7 +63,7 @@ func (store *stateDBWorkingSetStore) States(ns string, keys [][]byte) ([][]byte, func (store *stateDBWorkingSetStore) Finalize(height uint64) error { // Persist current chain Height store.flusher.KVStoreWithBuffer().MustPut( - AccountKVNamespace, + store.metaNS, []byte(CurrentHeightKey), byteutil.Uint64ToBytes(height), ) diff --git a/state/factory/workingsetstore_test.go b/state/factory/workingsetstore_test.go index cf38df4ef6..97c52a7c1e 100644 --- a/state/factory/workingsetstore_test.go +++ b/state/factory/workingsetstore_test.go @@ -12,12 +12,25 @@ import ( "testing" "github.com/iotexproject/iotex-core/v2/action/protocol" + "github.com/iotexproject/iotex-core/v2/action/protocol/execution/evm" "github.com/iotexproject/iotex-core/v2/db" "github.com/iotexproject/iotex-core/v2/db/batch" "github.com/iotexproject/iotex-core/v2/pkg/util/byteutil" "github.com/stretchr/testify/require" ) +var ( + name = "name" + viewValue = "value" + namespace = "namespace" + key1 = []byte("key1") + value1 = []byte("value1") + key2 = []byte("key2") + value2 = []byte("value2") + key3 = []byte("key3") + value3 = []byte("value3") +) + func TestStateDBWorkingSetStore(t *testing.T) { require := require.New(t) ctx := context.Background() @@ -25,18 +38,9 @@ func TestStateDBWorkingSetStore(t *testing.T) { inMemStore := db.NewMemKVStore() flusher, err := db.NewKVStoreFlusher(inMemStore, batch.NewCachedBatch()) require.NoError(err) - store := newStateDBWorkingSetStore(view, flusher, true) + store := newStateDBWorkingSetStore(view, flusher, true, AccountKVNamespace) require.NotNil(store) require.NoError(store.Start(ctx)) - name := "name" - viewValue := "value" - namespace := "namespace" - key1 := []byte("key1") - value1 := []byte("value1") - key2 := []byte("key2") - value2 := []byte("value2") - key3 := []byte("key3") - value3 := []byte("value3") t.Run("test view", func(t *testing.T) { _, err := store.ReadView(name) require.Error(err) @@ -109,6 +113,45 @@ func TestStateDBWorkingSetStore(t *testing.T) { require.NoError(store.Stop(ctx)) } +func TestVersionedWorkingSetStore(t *testing.T) { + r := require.New(t) + var ( + mns = "mta" + stores = []workingSetStore{} + digest = [2]string{ + "e6958faedcc37528dad9ac99f5e6613fbefbf403a06fe962535225d42a27b189", + "bb262ac0603e48aa737f5eb42014f481cb54d831c14fe736b8f61b69e5b4924a", + } + ) + for _, preEaster := range []bool{false, true} { + for _, versioned := range []bool{true, false} { + for _, ns := range []string{mns, "test1", "test can pass with any string here"} { + var rtf daoRetrofitter + if versioned { + rtf = newDaoRetrofitterArchive(nil, ns) + } else { + rtf = newDaoRetrofitter(nil) + } + flusher, err := db.NewKVStoreFlusher(db.NewMemKVStore(), batch.NewCachedBatch(), rtf.flusherOptions(preEaster)...) + r.NoError(err) + stores = append(stores, newStateDBWorkingSetStore(nil, flusher, true, rtf.metadataNS())) + } + } + } + for i, store := range stores { + r.NotNil(store) + r.NoError(store.Put(namespace, key1, value1)) + r.NoError(store.Put(namespace, key2, value2)) + r.NoError(store.Put(namespace, []byte(CurrentHeightKey), value3)) + r.NoError(store.Put(mns, key1, value1)) + r.NoError(store.Delete(namespace, key2)) + r.NoError(store.Put(evm.CodeKVNameSpace, key3, value1)) + r.NoError(store.Finalize(3)) + h := store.Digest() + r.Equal(digest[i/6], hex.EncodeToString(h[:])) + } +} + func TestFactoryWorkingSetStore(t *testing.T) { // TODO: add unit test for factory working set store }